From 67d34ad0ba9d948bd532bb69db3a4cc4bd203f5b Mon Sep 17 00:00:00 2001 From: fly6516 Date: Tue, 22 Apr 2025 13:47:31 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=AE=BE=E7=BD=AE=20PySpark=20=E7=9A=84?= =?UTF-8?q?=20Python=E7=8E=AF=E5=A2=83=E5=8F=98=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 环境变量设置 - 指定 Python 3.6 作为 PySpark 的 Python 版本 --- data_prepare-0.py | 73 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 data_prepare-0.py diff --git a/data_prepare-0.py b/data_prepare-0.py new file mode 100644 index 0000000..5fb8fe9 --- /dev/null +++ b/data_prepare-0.py @@ -0,0 +1,73 @@ +from pyspark import SparkContext, SparkConf +import os +os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171" +os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" +os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3" + +def get_ratings_tuple(entry): + items = entry.split('::') + return int(items[0]), int(items[1]), float(items[2]) + +def get_movie_tuple(entry): + items = entry.split('::') + return int(items[0]), items[1] + +def sortFunction(tuple): + key = str('%06.3f ' % tuple[0]) + value = tuple[1] + return (key + ' ' + value) + +def init_rdds(sc): + ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt" + moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat" + + numPartitions = 2 + rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions) + rawMovies = sc.textFile(moviesFilename) + + ratingsRDD = rawRatings.map(get_ratings_tuple).cache() + moviesRDD = rawMovies.map(get_movie_tuple).cache() + + return ratingsRDD, moviesRDD + +if __name__ == "__main__": + import sys, os + os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" + os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" + + conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") + #连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie +sc = SparkContext.getOrCreate(conf) + sc.setLogLevel("ERROR") + + ratingsRDD, moviesRDD = init_rdds(sc) + + ratingsCount = ratingsRDD.count() + moviesCount = moviesRDD.count() + + print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)) + print('Ratings: %s' % ratingsRDD.take(3)) + print('Movies: %s' % moviesRDD.take(3)) + + tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')] + tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')] + + oneRDD = sc.parallelize(tmp1) + twoRDD = sc.parallelize(tmp2) + oneSorted = oneRDD.sortByKey(True).collect() + twoSorted = twoRDD.sortByKey(True).collect() + print(oneSorted) + print(twoSorted) + assert set(oneSorted) == set(twoSorted) + assert twoSorted[0][0] < twoSorted.pop()[0] + assert oneSorted[0:2] != twoSorted[0:2] + + print(oneRDD.sortBy(sortFunction, True).collect()) + print(twoRDD.sortBy(sortFunction, True).collect()) + + oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction) + twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction) + print('one is %s' % oneSorted1) + print('two is %s' % twoSorted1) + assert oneSorted1 == twoSorted1 + sc.stop()