feat(data): 添加数据准备脚本
- 新增 data_prepare.py 文件,用于初始化和处理电影评分数据 - 实现了从 HDFS 读取 ratings 和 movies 数据的功能 - 提供了数据解析和缓存的逻辑,为后续处理做准备
This commit is contained in:
parent
254e595256
commit
3cb7ec6dba
40
data_prepare.py
Normal file
40
data_prepare.py
Normal file
@ -0,0 +1,40 @@
|
||||
from pyspark import SparkContext, SparkConf
|
||||
import os
|
||||
|
||||
# 设置 Java 环境变量
|
||||
os.environ['JAVA_HOME'] = '/opt/module/jdk1.8.0_171'
|
||||
|
||||
# 解析 ratings 行为 (userID, movieID, rating)
|
||||
def get_ratings_tuple(entry):
|
||||
user, movie, rating, _ = entry.split('::')
|
||||
return int(user), int(movie), float(rating)
|
||||
|
||||
# 解析 movies 行为 (movieID, title)
|
||||
def get_movie_tuple(entry):
|
||||
mid, title, _ = entry.split('::')
|
||||
return int(mid), title
|
||||
|
||||
# 用于排序时生成确定性键
|
||||
def sort_key(rec):
|
||||
score, name = rec
|
||||
return f"{score:06.3f} {name}"
|
||||
|
||||
# 初始化并返回 ratingsRDD, moviesRDD
|
||||
def init_rdds(sc, hdfs_base='hdfs://master:9000/user/root/als_movie'):
|
||||
ratings_path = f"{hdfs_base}/ratings.txt"
|
||||
movies_path = f"{hdfs_base}/movies.dat"
|
||||
raw_r = sc.textFile(ratings_path).repartition(2)
|
||||
raw_m = sc.textFile(movies_path)
|
||||
ratings_rdd = raw_r.map(get_ratings_tuple).cache()
|
||||
movies_rdd = raw_m.map(get_movie_tuple).cache()
|
||||
return ratings_rdd, movies_rdd
|
||||
|
||||
if __name__ == '__main__':
|
||||
conf = SparkConf().setMaster('spark://master:7077').setAppName('als_movie')
|
||||
sc = SparkContext.getOrCreate(conf)
|
||||
sc.setLogLevel('ERROR')
|
||||
|
||||
rdd_ratings, rdd_movies = init_rdds(sc)
|
||||
print(f"Ratings count: {rdd_ratings.count()}")
|
||||
print(f"Movies count: {rdd_movies.count()}")
|
||||
sc.stop()
|
Loading…
Reference in New Issue
Block a user