diff --git a/data_prepare.py b/data_prepare.py new file mode 100644 index 0000000..81124e6 --- /dev/null +++ b/data_prepare.py @@ -0,0 +1,40 @@ +from pyspark import SparkContext, SparkConf +import os + +# 设置 Java 环境变量 +os.environ['JAVA_HOME'] = '/opt/module/jdk1.8.0_171' + +# 解析 ratings 行为 (userID, movieID, rating) +def get_ratings_tuple(entry): + user, movie, rating, _ = entry.split('::') + return int(user), int(movie), float(rating) + +# 解析 movies 行为 (movieID, title) +def get_movie_tuple(entry): + mid, title, _ = entry.split('::') + return int(mid), title + +# 用于排序时生成确定性键 +def sort_key(rec): + score, name = rec + return f"{score:06.3f} {name}" + +# 初始化并返回 ratingsRDD, moviesRDD +def init_rdds(sc, hdfs_base='hdfs://master:9000/user/root/als_movie'): + ratings_path = f"{hdfs_base}/ratings.txt" + movies_path = f"{hdfs_base}/movies.dat" + raw_r = sc.textFile(ratings_path).repartition(2) + raw_m = sc.textFile(movies_path) + ratings_rdd = raw_r.map(get_ratings_tuple).cache() + movies_rdd = raw_m.map(get_movie_tuple).cache() + return ratings_rdd, movies_rdd + +if __name__ == '__main__': + conf = SparkConf().setMaster('spark://master:7077').setAppName('als_movie') + sc = SparkContext.getOrCreate(conf) + sc.setLogLevel('ERROR') + + rdd_ratings, rdd_movies = init_rdds(sc) + print(f"Ratings count: {rdd_ratings.count()}") + print(f"Movies count: {rdd_movies.count()}") + sc.stop() \ No newline at end of file