als_movie/self_recommand.py
fly6516 ee82521560 feat(self_recommend): 添加基于协同过滤的个性化电影推荐功能
- 新增数据加载和预处理模块,使用 data_prepare.py 准备数据
- 实现了基于 ALS算法的电影推荐系统- 添加了个人评分数据,实现了个性化的电影推荐
- 优化了模型参数,提高了推荐系统的准确性
- 增加了推荐结果的输出,方便用户查看
2025-04-22 15:55:10 +08:00

79 lines
3.3 KiB
Python

from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import ALS
import data_prepare
import math
# Compute RMSE between predicted and actual ratings
def computeError(predictedRDD, actualRDD):
predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD) \
.map(lambda pre: (pre[1][0] - pre[1][1]) ** 2)
totalError = squaredErrorsRDD.reduce(lambda a, b: a + b)
numRatings = squaredErrorsRDD.count()
return math.sqrt(float(totalError) / numRatings)
if __name__ == "__main__":
import sys, os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
# Initialize Spark context
conf = SparkConf().setMaster("spark://master:7077").setAppName("self_recommend")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
sc.addPyFile("data_prepare.py")
# Load ratings and movie data
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
# Split data into training, validation, and test sets
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2)
# 4.1 Add personal ratings (userID=0)
myUserID = 0
myRatings = [
(myUserID, 1, 4.0), (myUserID, 2, 5.0), (myUserID, 3, 3.0),
(myUserID, 10, 4.5), (myUserID, 20, 2.0), (myUserID, 30, 5.0),
(myUserID, 40, 4.0), (myUserID, 50, 3.5), (myUserID, 60, 4.0),
(myUserID, 70, 4.5)
]
myRatingsRDD = sc.parallelize(myRatings)
# 4.2 Combine personal ratings with training data
trainingWithMyRDD = trainingRDD.union(myRatingsRDD)
# 4.3 Train model with the same parameters
seed = 2
iterations = 5
regularizationParameter = 0.1
bestRank = 8 # Optimize this based on previous collaborative filtering
myModel = ALS.train(trainingWithMyRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
# 4.4 Evaluate RMSE on the test set
testPairs = testRDD.map(lambda x: (x[0], x[1]))
predictedTestMyRDD = myModel.predictAll(testPairs)
testRMSE = computeError(predictedTestMyRDD, testRDD)
print('Test RMSE with personal ratings: %s' % testRMSE)
# 4.5 Predict ratings for unrated movies
ratedMovieIDs = [m[1] for m in myRatings]
allMovieIDs = moviesRDD.map(lambda x: x[0]).collect()
unrated = [(myUserID, mid) for mid in allMovieIDs if mid not in ratedMovieIDs]
unratedRDD = sc.parallelize(unrated)
predictedUnratedRDD = myModel.predictAll(unratedRDD)
# 4.6 Filter movies with more than 75 ratings and output top 25 recommendations
counts = ratingsRDD.map(lambda x: (x[1], 1)).countByKey()
candidates = predictedUnratedRDD.filter(lambda x: counts.get(x[1], 0) > 75)
# Map to (predictedRating, movieID) then join with movie titles
recs = candidates.map(lambda x: (x[2], x[1])) \
.join(moviesRDD.map(lambda x: (x[0], x[1]))) \
.map(lambda x: x[1]) \
.takeOrdered(25, key=lambda x: -x[0])
# Print top recommendations
print('Top 25 movie recommendations for you:')
for rating, title in recs:
print('%s: predicted rating %.2f' % (title, rating))
sc.stop()