From 2018debf80a5244cc7b0018a1c6951c03ccf7af2 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Tue, 22 Apr 2025 13:22:34 +0800 Subject: [PATCH] =?UTF-8?q?refactor(data=5Fprepare):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=87=86=E5=A4=87=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 优化了代码结构,提高了代码可读性和维护性 - 改进了变量命名,使其更具描述性和一致性- 删除了未使用的函数和测试代码,精简了脚本内容 - 更新了字符串格式化方法,使用了更现代的 f-string 方式 --- data_prepare.py | 89 ++++++++++++++++--------------------------------- 1 file changed, 29 insertions(+), 60 deletions(-) diff --git a/data_prepare.py b/data_prepare.py index ed6a300..be45763 100644 --- a/data_prepare.py +++ b/data_prepare.py @@ -1,71 +1,40 @@ from pyspark import SparkContext, SparkConf import os -os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171" +# Set Java environment variable +os.environ['JAVA_HOME'] = '/opt/module/jdk1.8.0_171' + +# Parse ratings data into (userID, movieID, rating) def get_ratings_tuple(entry): - items = entry.split('::') - return int(items[0]), int(items[1]), float(items[2]) + user, movie, rating, _ = entry.split('::') + return int(user), int(movie), float(rating) +# Parse movies data into (movieID, title) def get_movie_tuple(entry): - items = entry.split('::') - return int(items[0]), items[1] + mid, title, _ = entry.split('::') + return int(mid), title -def sortFunction(tuple): - key = str('%06.3f ' % tuple[0]) - value = tuple[1] - return (key + ' ' + value) +# Generate deterministic key for sorting +def sort_key(rec): + score, name = rec + return f"{score:06.3f} {name}" -def init_rdds(sc): - ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt" - moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat" +# Initialize and return ratingsRDD, moviesRDD +def init_rdds(sc, hdfs_base='hdfs://master:9000/user/root/als_movie'): + ratings_path = f"{hdfs_base}/ratings.txt" + movies_path = f"{hdfs_base}/movies.dat" + raw_r = sc.textFile(ratings_path).repartition(2) + raw_m = sc.textFile(movies_path) + ratings_rdd = raw_r.map(get_ratings_tuple).cache() + movies_rdd = raw_m.map(get_movie_tuple).cache() + return ratings_rdd, movies_rdd - numPartitions = 2 - rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions) - rawMovies = sc.textFile(moviesFilename) - - ratingsRDD = rawRatings.map(get_ratings_tuple).cache() - moviesRDD = rawMovies.map(get_movie_tuple).cache() - - return ratingsRDD, moviesRDD - -if __name__ == "__main__": - import sys, os - os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" - os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" - - conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") - #连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie +if __name__ == '__main__': + conf = SparkConf().setMaster('spark://master:7077').setAppName('als_movie') sc = SparkContext.getOrCreate(conf) - sc.setLogLevel("ERROR") + sc.setLogLevel('ERROR') - ratingsRDD, moviesRDD = init_rdds(sc) - - ratingsCount = ratingsRDD.count() - moviesCount = moviesRDD.count() - - print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)) - print('Ratings: %s' % ratingsRDD.take(3)) - print('Movies: %s' % moviesRDD.take(3)) - - tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')] - tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')] - - oneRDD = sc.parallelize(tmp1) - twoRDD = sc.parallelize(tmp2) - oneSorted = oneRDD.sortByKey(True).collect() - twoSorted = twoRDD.sortByKey(True).collect() - print(oneSorted) - print(twoSorted) - assert set(oneSorted) == set(twoSorted) - assert twoSorted[0][0] < twoSorted.pop()[0] - assert oneSorted[0:2] != twoSorted[0:2] - - print(oneRDD.sortBy(sortFunction, True).collect()) - print(twoRDD.sortBy(sortFunction, True).collect()) - - oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction) - twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction) - print('one is %s' % oneSorted1) - print('two is %s' % twoSorted1) - assert oneSorted1 == twoSorted1 - sc.stop() + rdd_ratings, rdd_movies = init_rdds(sc) + print(f"Ratings count: {rdd_ratings.count()}") + print(f"Movies count: {rdd_movies.count()}") + sc.stop() \ No newline at end of file