From 3cb7ec6dba5c6c3fec94cd2ab58c2c69fffdbec7 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Tue, 22 Apr 2025 13:12:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(data):=20=E6=B7=BB=E5=8A=A0=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=87=86=E5=A4=87=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 data_prepare.py 文件,用于初始化和处理电影评分数据 - 实现了从 HDFS 读取 ratings 和 movies 数据的功能 - 提供了数据解析和缓存的逻辑,为后续处理做准备 --- data_prepare.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 data_prepare.py diff --git a/data_prepare.py b/data_prepare.py new file mode 100644 index 0000000..81124e6 --- /dev/null +++ b/data_prepare.py @@ -0,0 +1,40 @@ +from pyspark import SparkContext, SparkConf +import os + +# 设置 Java 环境变量 +os.environ['JAVA_HOME'] = '/opt/module/jdk1.8.0_171' + +# 解析 ratings 行为 (userID, movieID, rating) +def get_ratings_tuple(entry): + user, movie, rating, _ = entry.split('::') + return int(user), int(movie), float(rating) + +# 解析 movies 行为 (movieID, title) +def get_movie_tuple(entry): + mid, title, _ = entry.split('::') + return int(mid), title + +# 用于排序时生成确定性键 +def sort_key(rec): + score, name = rec + return f"{score:06.3f} {name}" + +# 初始化并返回 ratingsRDD, moviesRDD +def init_rdds(sc, hdfs_base='hdfs://master:9000/user/root/als_movie'): + ratings_path = f"{hdfs_base}/ratings.txt" + movies_path = f"{hdfs_base}/movies.dat" + raw_r = sc.textFile(ratings_path).repartition(2) + raw_m = sc.textFile(movies_path) + ratings_rdd = raw_r.map(get_ratings_tuple).cache() + movies_rdd = raw_m.map(get_movie_tuple).cache() + return ratings_rdd, movies_rdd + +if __name__ == '__main__': + conf = SparkConf().setMaster('spark://master:7077').setAppName('als_movie') + sc = SparkContext.getOrCreate(conf) + sc.setLogLevel('ERROR') + + rdd_ratings, rdd_movies = init_rdds(sc) + print(f"Ratings count: {rdd_ratings.count()}") + print(f"Movies count: {rdd_movies.count()}") + sc.stop() \ No newline at end of file