- 新增数据加载和预处理模块,使用 data_prepare.py 准备数据 - 实现了基于 ALS算法的电影推荐系统- 添加了个人评分数据,实现了个性化的电影推荐 - 优化了模型参数,提高了推荐系统的准确性 - 增加了推荐结果的输出,方便用户查看
79 lines
3.3 KiB
Python
79 lines
3.3 KiB
Python
from pyspark import SparkContext, SparkConf
|
|
from pyspark.mllib.recommendation import ALS
|
|
import data_prepare
|
|
import math
|
|
|
|
# Compute RMSE between predicted and actual ratings
|
|
def computeError(predictedRDD, actualRDD):
|
|
predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
|
|
actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
|
|
squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD) \
|
|
.map(lambda pre: (pre[1][0] - pre[1][1]) ** 2)
|
|
totalError = squaredErrorsRDD.reduce(lambda a, b: a + b)
|
|
numRatings = squaredErrorsRDD.count()
|
|
return math.sqrt(float(totalError) / numRatings)
|
|
|
|
if __name__ == "__main__":
|
|
import sys, os
|
|
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
|
|
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
|
|
|
|
# Initialize Spark context
|
|
conf = SparkConf().setMaster("spark://master:7077").setAppName("self_recommend")
|
|
sc = SparkContext.getOrCreate(conf)
|
|
sc.setLogLevel("ERROR")
|
|
sc.addPyFile("data_prepare.py")
|
|
|
|
# Load ratings and movie data
|
|
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
|
|
# Split data into training, validation, and test sets
|
|
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2)
|
|
|
|
# 4.1 Add personal ratings (userID=0)
|
|
myUserID = 0
|
|
myRatings = [
|
|
(myUserID, 1, 4.0), (myUserID, 2, 5.0), (myUserID, 3, 3.0),
|
|
(myUserID, 10, 4.5), (myUserID, 20, 2.0), (myUserID, 30, 5.0),
|
|
(myUserID, 40, 4.0), (myUserID, 50, 3.5), (myUserID, 60, 4.0),
|
|
(myUserID, 70, 4.5)
|
|
]
|
|
myRatingsRDD = sc.parallelize(myRatings)
|
|
|
|
# 4.2 Combine personal ratings with training data
|
|
trainingWithMyRDD = trainingRDD.union(myRatingsRDD)
|
|
|
|
# 4.3 Train model with the same parameters
|
|
seed = 2
|
|
iterations = 5
|
|
regularizationParameter = 0.1
|
|
bestRank = 8 # Optimize this based on previous collaborative filtering
|
|
myModel = ALS.train(trainingWithMyRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
|
|
|
|
# 4.4 Evaluate RMSE on the test set
|
|
testPairs = testRDD.map(lambda x: (x[0], x[1]))
|
|
predictedTestMyRDD = myModel.predictAll(testPairs)
|
|
testRMSE = computeError(predictedTestMyRDD, testRDD)
|
|
print('Test RMSE with personal ratings: %s' % testRMSE)
|
|
|
|
# 4.5 Predict ratings for unrated movies
|
|
ratedMovieIDs = [m[1] for m in myRatings]
|
|
allMovieIDs = moviesRDD.map(lambda x: x[0]).collect()
|
|
unrated = [(myUserID, mid) for mid in allMovieIDs if mid not in ratedMovieIDs]
|
|
unratedRDD = sc.parallelize(unrated)
|
|
predictedUnratedRDD = myModel.predictAll(unratedRDD)
|
|
|
|
# 4.6 Filter movies with more than 75 ratings and output top 25 recommendations
|
|
counts = ratingsRDD.map(lambda x: (x[1], 1)).countByKey()
|
|
candidates = predictedUnratedRDD.filter(lambda x: counts.get(x[1], 0) > 75)
|
|
# Map to (predictedRating, movieID) then join with movie titles
|
|
recs = candidates.map(lambda x: (x[2], x[1])) \
|
|
.join(moviesRDD.map(lambda x: (x[0], x[1]))) \
|
|
.map(lambda x: x[1]) \
|
|
.takeOrdered(25, key=lambda x: -x[0])
|
|
|
|
# Print top recommendations
|
|
print('Top 25 movie recommendations for you:')
|
|
for rating, title in recs:
|
|
print('%s: predicted rating %.2f' % (title, rating))
|
|
|
|
sc.stop() |