als_movie/data_prepare.py
fly6516 73ae9b135b fix: 设置 PySpark 的 Python环境变量
- 添加 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 环境变量设置
- 指定 Python 3.6 作为 PySpark 的 Python 版本
2025-04-22 13:39:30 +08:00

42 lines
1.4 KiB
Python

from pyspark import SparkContext, SparkConf
import os
# Set Java environment variable
os.environ['JAVA_HOME'] = '/opt/module/jdk1.8.0_171'
os.environ["PYSPARK_PYTHON"]="/usr/local/bin/python3.6"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/local/bin/python3.6"
# Parse ratings data into (userID, movieID, rating)
def get_ratings_tuple(entry):
user, movie, rating, _ = entry.split('::')
return int(user), int(movie), float(rating)
# Parse movies data into (movieID, title)
def get_movie_tuple(entry):
mid, title, _ = entry.split('::')
return int(mid), title
# Generate deterministic key for sorting
def sort_key(rec):
score, name = rec
return f"{score:06.3f} {name}"
# Initialize and return ratingsRDD, moviesRDD
def init_rdds(sc, hdfs_base='hdfs://master:9000/user/root/als_movie'):
ratings_path = f"{hdfs_base}/ratings.txt"
movies_path = f"{hdfs_base}/movies.dat"
raw_r = sc.textFile(ratings_path).repartition(2)
raw_m = sc.textFile(movies_path)
ratings_rdd = raw_r.map(get_ratings_tuple).cache()
movies_rdd = raw_m.map(get_movie_tuple).cache()
return ratings_rdd, movies_rdd
if __name__ == '__main__':
conf = SparkConf().setMaster('spark://master:7077').setAppName('als_movie')
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel('ERROR')
rdd_ratings, rdd_movies = init_rdds(sc)
print(f"Ratings count: {rdd_ratings.count()}")
print(f"Movies count: {rdd_movies.count()}")
sc.stop()