From de024388a1980367815de02984c2c05a7a3c0bf7 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Tue, 22 Apr 2025 15:41:20 +0800 Subject: [PATCH] =?UTF-8?q?feat(self=5Frecommand):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E4=B8=AA=E6=80=A7=E5=8C=96=E7=94=B5=E5=BD=B1=E6=8E=A8=E8=8D=90?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现了一个基于Spark的电影推荐系统 -包含数据准备、模型训练、误差计算等功能 - 能够根据用户历史评分和未观看电影进行个性化推荐 - 输出 Top25 电影推荐列表 --- self_recommand.py | 79 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 self_recommand.py diff --git a/self_recommand.py b/self_recommand.py new file mode 100644 index 0000000..56fd4f7 --- /dev/null +++ b/self_recommand.py @@ -0,0 +1,79 @@ +from pyspark import SparkContext, SparkConf +from pyspark.mllib.recommendation import ALS +import data_prepare +import math + +# Compute RMSE between predicted and actual ratings +def computeError(predictedRDD, actualRDD): + predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) + actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) + squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD) \ + .map(lambda pre: (pre[1][0] - pre[1][1]) ** 2) + totalError = squaredErrorsRDD.reduce(lambda a, b: a + b) + numRatings = squaredErrorsRDD.count() + return math.sqrt(float(totalError) / numRatings) + +if __name__ == "__main__": + import sys, os + os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" + os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" + + # Initialize Spark context + conf = SparkConf().setMaster("spark://master:7077").setAppName("self_recommend") + sc = SparkContext.getOrCreate(conf) + sc.setLogLevel("ERROR") + sc.addPyFile("data_prepare.py") + + # Load ratings and movie data + ratingsRDD, moviesRDD = data_prepare.init_rdds(sc) + # Split data into training, validation, and test sets + trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2) + + # 4.1 Add personal ratings (userID=0) + myUserID = 0 + myRatings = [ + (myUserID, 1, 4.0), (myUserID, 2, 5.0), (myUserID, 3, 3.0), + (myUserID, 10, 4.5), (myUserID, 20, 2.0), (myUserID, 30, 5.0), + (myUserID, 40, 4.0), (myUserID, 50, 3.5), (myUserID, 60, 4.0), + (myUserID, 70, 4.5) + ] + myRatingsRDD = sc.parallelize(myRatings) + + # 4.2 Combine personal ratings with training data + trainingWithMyRDD = trainingRDD.union(myRatingsRDD) + + # 4.3 Train model with the same parameters + seed = 2 + iterations = 5 + regularizationParameter = 0.1 + bestRank = 8 # Optimize this based on previous collaborative filtering + myModel = ALS.train(trainingWithMyRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter) + + # 4.4 Evaluate RMSE on the test set + testPairs = testRDD.map(lambda x: (x[0], x[1])) + predictedTestMyRDD = myModel.predictAll(testPairs) + testRMSE = computeError(predictedTestMyRDD, testRDD) + print('Test RMSE with personal ratings: %s' % testRMSE) + + # 4.5 Predict ratings for unrated movies + ratedMovieIDs = [m[1] for m in myRatings] + allMovieIDs = moviesRDD.map(lambda x: x[0]).collect() + unrated = [(myUserID, mid) for mid in allMovieIDs if mid not in ratedMovieIDs] + unratedRDD = sc.parallelize(unrated) + predictedUnratedRDD = myModel.predictAll(unratedRDD) + + # 4.6 Filter movies with more than 75 ratings and output top 25 recommendations + counts = ratingsRDD.map(lambda x: (x[1], 1)).countByKey() + candidates = predictedUnratedRDD.filter(lambda x: counts.get(x[1], 0) > 75) + # Map to (predictedRating, movieID) then join with movie titles + recs = candidates.map(lambda x: (x[2], x[1])) \ + .join(moviesRDD.map(lambda x: (x[0], x[1]))) \ + .map(lambda x: x[1]) \ + .takeOrdered(25, key=lambda x: -x[0]) + + # Print top recommendations + print('Top 25 movie recommendations for you:') + for rating, title in recs: + print('%s: predicted rating %.2f' % (title, rating)) + + sc.stop() \ No newline at end of file