feat(self_recommand): 添加个性化电影推荐系统
- 实现了一个基于Spark的电影推荐系统 -包含数据准备、模型训练、误差计算等功能 - 能够根据用户历史评分和未观看电影进行个性化推荐 - 输出 Top25 电影推荐列表
This commit is contained in:
parent
4d50e6cdc2
commit
de024388a1
79
self_recommand.py
Normal file
79
self_recommand.py
Normal file
@ -0,0 +1,79 @@
|
||||
from pyspark import SparkContext, SparkConf
|
||||
from pyspark.mllib.recommendation import ALS
|
||||
import data_prepare
|
||||
import math
|
||||
|
||||
# Compute RMSE between predicted and actual ratings
|
||||
def computeError(predictedRDD, actualRDD):
|
||||
predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
|
||||
actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
|
||||
squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD) \
|
||||
.map(lambda pre: (pre[1][0] - pre[1][1]) ** 2)
|
||||
totalError = squaredErrorsRDD.reduce(lambda a, b: a + b)
|
||||
numRatings = squaredErrorsRDD.count()
|
||||
return math.sqrt(float(totalError) / numRatings)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys, os
|
||||
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
|
||||
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
|
||||
|
||||
# Initialize Spark context
|
||||
conf = SparkConf().setMaster("spark://master:7077").setAppName("self_recommend")
|
||||
sc = SparkContext.getOrCreate(conf)
|
||||
sc.setLogLevel("ERROR")
|
||||
sc.addPyFile("data_prepare.py")
|
||||
|
||||
# Load ratings and movie data
|
||||
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
|
||||
# Split data into training, validation, and test sets
|
||||
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2)
|
||||
|
||||
# 4.1 Add personal ratings (userID=0)
|
||||
myUserID = 0
|
||||
myRatings = [
|
||||
(myUserID, 1, 4.0), (myUserID, 2, 5.0), (myUserID, 3, 3.0),
|
||||
(myUserID, 10, 4.5), (myUserID, 20, 2.0), (myUserID, 30, 5.0),
|
||||
(myUserID, 40, 4.0), (myUserID, 50, 3.5), (myUserID, 60, 4.0),
|
||||
(myUserID, 70, 4.5)
|
||||
]
|
||||
myRatingsRDD = sc.parallelize(myRatings)
|
||||
|
||||
# 4.2 Combine personal ratings with training data
|
||||
trainingWithMyRDD = trainingRDD.union(myRatingsRDD)
|
||||
|
||||
# 4.3 Train model with the same parameters
|
||||
seed = 2
|
||||
iterations = 5
|
||||
regularizationParameter = 0.1
|
||||
bestRank = 8 # Optimize this based on previous collaborative filtering
|
||||
myModel = ALS.train(trainingWithMyRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
|
||||
|
||||
# 4.4 Evaluate RMSE on the test set
|
||||
testPairs = testRDD.map(lambda x: (x[0], x[1]))
|
||||
predictedTestMyRDD = myModel.predictAll(testPairs)
|
||||
testRMSE = computeError(predictedTestMyRDD, testRDD)
|
||||
print('Test RMSE with personal ratings: %s' % testRMSE)
|
||||
|
||||
# 4.5 Predict ratings for unrated movies
|
||||
ratedMovieIDs = [m[1] for m in myRatings]
|
||||
allMovieIDs = moviesRDD.map(lambda x: x[0]).collect()
|
||||
unrated = [(myUserID, mid) for mid in allMovieIDs if mid not in ratedMovieIDs]
|
||||
unratedRDD = sc.parallelize(unrated)
|
||||
predictedUnratedRDD = myModel.predictAll(unratedRDD)
|
||||
|
||||
# 4.6 Filter movies with more than 75 ratings and output top 25 recommendations
|
||||
counts = ratingsRDD.map(lambda x: (x[1], 1)).countByKey()
|
||||
candidates = predictedUnratedRDD.filter(lambda x: counts.get(x[1], 0) > 75)
|
||||
# Map to (predictedRating, movieID) then join with movie titles
|
||||
recs = candidates.map(lambda x: (x[2], x[1])) \
|
||||
.join(moviesRDD.map(lambda x: (x[0], x[1]))) \
|
||||
.map(lambda x: x[1]) \
|
||||
.takeOrdered(25, key=lambda x: -x[0])
|
||||
|
||||
# Print top recommendations
|
||||
print('Top 25 movie recommendations for you:')
|
||||
for rating, title in recs:
|
||||
print('%s: predicted rating %.2f' % (title, rating))
|
||||
|
||||
sc.stop()
|
Loading…
Reference in New Issue
Block a user