feat(self_recommend): 添加基于协同过滤的个性化电影推荐功能

- 新增数据加载和预处理模块,使用 data_prepare.py 准备数据
- 实现了基于 ALS算法的电影推荐系统- 添加了个人评分数据,实现了个性化的电影推荐
- 优化了模型参数,提高了推荐系统的准确性
- 增加了推荐结果的输出,方便用户查看
This commit is contained in:
fly6516 2025-04-22 15:55:10 +08:00
parent 107d84f167
commit ee82521560

View File

@ -1,74 +1,79 @@
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import ALS
import data_prepare
import math
# Initialize Spark context
sc = SparkContext(appName="MovieLensALS")
# Load ratings data: format (userId, movieId, rating)
rawData = sc.textFile("hdfs://master:9000/user/root/ratings.csv")
header = rawData.first()
rawData = rawData.filter(lambda line: line != header)
# Convert to Rating objects and filter incomplete lines
ratingsRDD = rawData.map(lambda line: line.split(",")) \
.filter(lambda fields: len(fields) >= 3) \
.map(lambda fields: Rating(int(fields[0]), int(fields[1]), float(fields[2])))
# Split data into training, validation, and test sets (60%, 20%, 20%)
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0)
print("Training: %d, validation: %d, test: %d" % (trainingRDD.count(), validationRDD.count(), testRDD.count()))
# Set ALS model parameters
rank = 10
numIterations = 10
lambda_ = 0.1
# Train ALS model on the training data
myModel = ALS.train(trainingRDD, rank, iterations=numIterations, lambda_=lambda_)
# Function to compute RMSE between predicted and actual ratings
# Compute RMSE between predicted and actual ratings
def computeError(predictedRDD, actualRDD):
predictedReformattedRDD = predictedRDD.filter(lambda r: len(r) == 3) \
.map(lambda r: ((r[0], r[1]), r[2]))
actualReformattedRDD = actualRDD.filter(lambda r: len(r) == 3) \
.map(lambda r: ((r[0], r[1]), r[2]))
joined = predictedReformattedRDD.join(actualReformattedRDD)
squaredErrorsRDD = joined.map(lambda pair: (pair[1][0] - pair[1][1]) ** 2)
predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD) \
.map(lambda pre: (pre[1][0] - pre[1][1]) ** 2)
totalError = squaredErrorsRDD.reduce(lambda a, b: a + b)
numRatings = squaredErrorsRDD.count()
return math.sqrt(float(totalError) / numRatings) if numRatings != 0 else float('nan')
return math.sqrt(float(totalError) / numRatings)
# Predict and evaluate on the validation set
validationPairs = validationRDD.map(lambda rating: (rating[0], rating[1]))
predictedValRDD = myModel.predictAll(validationPairs)
if __name__ == "__main__":
import sys, os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
print("\nSample validation predictions:")
print(predictedValRDD.take(3))
print(validationRDD.take(3))
# Initialize Spark context
conf = SparkConf().setMaster("spark://master:7077").setAppName("self_recommend")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
sc.addPyFile("data_prepare.py")
try:
validationRMSE = computeError(predictedValRDD, validationRDD)
print("Validation RMSE: %f" % validationRMSE)
except Exception as e:
print("Error computing validation RMSE:", str(e))
# Load ratings and movie data
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
# Split data into training, validation, and test sets
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2)
# Predict and evaluate on the test set
testPairs = testRDD.map(lambda rating: (rating[0], rating[1]))
predictedTestRDD = myModel.predictAll(testPairs)
# 4.1 Add personal ratings (userID=0)
myUserID = 0
myRatings = [
(myUserID, 1, 4.0), (myUserID, 2, 5.0), (myUserID, 3, 3.0),
(myUserID, 10, 4.5), (myUserID, 20, 2.0), (myUserID, 30, 5.0),
(myUserID, 40, 4.0), (myUserID, 50, 3.5), (myUserID, 60, 4.0),
(myUserID, 70, 4.5)
]
myRatingsRDD = sc.parallelize(myRatings)
print("\nSample test predictions:")
print(predictedTestRDD.take(3))
print(testRDD.take(3))
# 4.2 Combine personal ratings with training data
trainingWithMyRDD = trainingRDD.union(myRatingsRDD)
print("Predicted count: %d" % predictedTestRDD.count())
print("Test count: %d" % testRDD.count())
# 4.3 Train model with the same parameters
seed = 2
iterations = 5
regularizationParameter = 0.1
bestRank = 8 # Optimize this based on previous collaborative filtering
myModel = ALS.train(trainingWithMyRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
try:
testRMSE = computeError(predictedTestRDD, testRDD)
print("Test RMSE: %f" % testRMSE)
except Exception as e:
print("Error computing test RMSE:", str(e))
# 4.4 Evaluate RMSE on the test set
testPairs = testRDD.map(lambda x: (x[0], x[1]))
predictedTestMyRDD = myModel.predictAll(testPairs)
testRMSE = computeError(predictedTestMyRDD, testRDD)
print('Test RMSE with personal ratings: %s' % testRMSE)
# Stop the Spark context
sc.stop()
# 4.5 Predict ratings for unrated movies
ratedMovieIDs = [m[1] for m in myRatings]
allMovieIDs = moviesRDD.map(lambda x: x[0]).collect()
unrated = [(myUserID, mid) for mid in allMovieIDs if mid not in ratedMovieIDs]
unratedRDD = sc.parallelize(unrated)
predictedUnratedRDD = myModel.predictAll(unratedRDD)
# 4.6 Filter movies with more than 75 ratings and output top 25 recommendations
counts = ratingsRDD.map(lambda x: (x[1], 1)).countByKey()
candidates = predictedUnratedRDD.filter(lambda x: counts.get(x[1], 0) > 75)
# Map to (predictedRating, movieID) then join with movie titles
recs = candidates.map(lambda x: (x[2], x[1])) \
.join(moviesRDD.map(lambda x: (x[0], x[1]))) \
.map(lambda x: x[1]) \
.takeOrdered(25, key=lambda x: -x[0])
# Print top recommendations
print('Top 25 movie recommendations for you:')
for rating, title in recs:
print('%s: predicted rating %.2f' % (title, rating))
sc.stop()