fix: 设置 PySpark 的 Python环境变量
- 添加 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 环境变量设置 - 指定 Python 3.6 作为 PySpark 的 Python 版本
This commit is contained in:
parent
5c00db57ff
commit
67d34ad0ba
73
data_prepare-0.py
Normal file
73
data_prepare-0.py
Normal file
@ -0,0 +1,73 @@
|
||||
from pyspark import SparkContext, SparkConf
|
||||
import os
|
||||
os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171"
|
||||
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
|
||||
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
|
||||
|
||||
def get_ratings_tuple(entry):
|
||||
items = entry.split('::')
|
||||
return int(items[0]), int(items[1]), float(items[2])
|
||||
|
||||
def get_movie_tuple(entry):
|
||||
items = entry.split('::')
|
||||
return int(items[0]), items[1]
|
||||
|
||||
def sortFunction(tuple):
|
||||
key = str('%06.3f ' % tuple[0])
|
||||
value = tuple[1]
|
||||
return (key + ' ' + value)
|
||||
|
||||
def init_rdds(sc):
|
||||
ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt"
|
||||
moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat"
|
||||
|
||||
numPartitions = 2
|
||||
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
|
||||
rawMovies = sc.textFile(moviesFilename)
|
||||
|
||||
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
|
||||
moviesRDD = rawMovies.map(get_movie_tuple).cache()
|
||||
|
||||
return ratingsRDD, moviesRDD
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys, os
|
||||
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
|
||||
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
|
||||
|
||||
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
|
||||
#连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie
|
||||
sc = SparkContext.getOrCreate(conf)
|
||||
sc.setLogLevel("ERROR")
|
||||
|
||||
ratingsRDD, moviesRDD = init_rdds(sc)
|
||||
|
||||
ratingsCount = ratingsRDD.count()
|
||||
moviesCount = moviesRDD.count()
|
||||
|
||||
print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount))
|
||||
print('Ratings: %s' % ratingsRDD.take(3))
|
||||
print('Movies: %s' % moviesRDD.take(3))
|
||||
|
||||
tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')]
|
||||
tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')]
|
||||
|
||||
oneRDD = sc.parallelize(tmp1)
|
||||
twoRDD = sc.parallelize(tmp2)
|
||||
oneSorted = oneRDD.sortByKey(True).collect()
|
||||
twoSorted = twoRDD.sortByKey(True).collect()
|
||||
print(oneSorted)
|
||||
print(twoSorted)
|
||||
assert set(oneSorted) == set(twoSorted)
|
||||
assert twoSorted[0][0] < twoSorted.pop()[0]
|
||||
assert oneSorted[0:2] != twoSorted[0:2]
|
||||
|
||||
print(oneRDD.sortBy(sortFunction, True).collect())
|
||||
print(twoRDD.sortBy(sortFunction, True).collect())
|
||||
|
||||
oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction)
|
||||
twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction)
|
||||
print('one is %s' % oneSorted1)
|
||||
print('two is %s' % twoSorted1)
|
||||
assert oneSorted1 == twoSorted1
|
||||
sc.stop()
|
Loading…
Reference in New Issue
Block a user