From 107d84f167fa3e40c07d5e15a2c73bba8cdf9ac8 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Tue, 22 Apr 2025 15:54:23 +0800 Subject: [PATCH] =?UTF-8?q?refactor(self=5Frecommand):=20=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E7=94=B5=E5=BD=B1=E6=8E=A8=E8=8D=90=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 优化了数据加载和预处理逻辑 - 改进了 RMSE 计算函数,增加了对空数据的处理 - 添加了对验证集和测试集的预测与评估 - 移除了个人评分和电影推荐的相关代码 --- self_recommand.py | 127 ++++++++++++++++++++++------------------------ 1 file changed, 61 insertions(+), 66 deletions(-) diff --git a/self_recommand.py b/self_recommand.py index 56fd4f7..ba71d60 100644 --- a/self_recommand.py +++ b/self_recommand.py @@ -1,79 +1,74 @@ -from pyspark import SparkContext, SparkConf -from pyspark.mllib.recommendation import ALS -import data_prepare +from pyspark import SparkContext +from pyspark.mllib.recommendation import ALS, Rating import math -# Compute RMSE between predicted and actual ratings +# Initialize Spark context +sc = SparkContext(appName="MovieLensALS") + +# Load ratings data: format (userId, movieId, rating) +rawData = sc.textFile("hdfs://master:9000/user/root/ratings.csv") +header = rawData.first() +rawData = rawData.filter(lambda line: line != header) + +# Convert to Rating objects and filter incomplete lines +ratingsRDD = rawData.map(lambda line: line.split(",")) \ + .filter(lambda fields: len(fields) >= 3) \ + .map(lambda fields: Rating(int(fields[0]), int(fields[1]), float(fields[2]))) + +# Split data into training, validation, and test sets (60%, 20%, 20%) +trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0) +print("Training: %d, validation: %d, test: %d" % (trainingRDD.count(), validationRDD.count(), testRDD.count())) + +# Set ALS model parameters +rank = 10 +numIterations = 10 +lambda_ = 0.1 + +# Train ALS model on the training data +myModel = ALS.train(trainingRDD, rank, iterations=numIterations, lambda_=lambda_) + +# Function to compute RMSE between predicted and actual ratings def computeError(predictedRDD, actualRDD): - predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) - actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) - squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD) \ - .map(lambda pre: (pre[1][0] - pre[1][1]) ** 2) + predictedReformattedRDD = predictedRDD.filter(lambda r: len(r) == 3) \ + .map(lambda r: ((r[0], r[1]), r[2])) + actualReformattedRDD = actualRDD.filter(lambda r: len(r) == 3) \ + .map(lambda r: ((r[0], r[1]), r[2])) + joined = predictedReformattedRDD.join(actualReformattedRDD) + squaredErrorsRDD = joined.map(lambda pair: (pair[1][0] - pair[1][1]) ** 2) totalError = squaredErrorsRDD.reduce(lambda a, b: a + b) numRatings = squaredErrorsRDD.count() - return math.sqrt(float(totalError) / numRatings) + return math.sqrt(float(totalError) / numRatings) if numRatings != 0 else float('nan') -if __name__ == "__main__": - import sys, os - os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" - os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" +# Predict and evaluate on the validation set +validationPairs = validationRDD.map(lambda rating: (rating[0], rating[1])) +predictedValRDD = myModel.predictAll(validationPairs) - # Initialize Spark context - conf = SparkConf().setMaster("spark://master:7077").setAppName("self_recommend") - sc = SparkContext.getOrCreate(conf) - sc.setLogLevel("ERROR") - sc.addPyFile("data_prepare.py") +print("\nSample validation predictions:") +print(predictedValRDD.take(3)) +print(validationRDD.take(3)) - # Load ratings and movie data - ratingsRDD, moviesRDD = data_prepare.init_rdds(sc) - # Split data into training, validation, and test sets - trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2) +try: + validationRMSE = computeError(predictedValRDD, validationRDD) + print("Validation RMSE: %f" % validationRMSE) +except Exception as e: + print("Error computing validation RMSE:", str(e)) - # 4.1 Add personal ratings (userID=0) - myUserID = 0 - myRatings = [ - (myUserID, 1, 4.0), (myUserID, 2, 5.0), (myUserID, 3, 3.0), - (myUserID, 10, 4.5), (myUserID, 20, 2.0), (myUserID, 30, 5.0), - (myUserID, 40, 4.0), (myUserID, 50, 3.5), (myUserID, 60, 4.0), - (myUserID, 70, 4.5) - ] - myRatingsRDD = sc.parallelize(myRatings) +# Predict and evaluate on the test set +testPairs = testRDD.map(lambda rating: (rating[0], rating[1])) +predictedTestRDD = myModel.predictAll(testPairs) - # 4.2 Combine personal ratings with training data - trainingWithMyRDD = trainingRDD.union(myRatingsRDD) +print("\nSample test predictions:") +print(predictedTestRDD.take(3)) +print(testRDD.take(3)) - # 4.3 Train model with the same parameters - seed = 2 - iterations = 5 - regularizationParameter = 0.1 - bestRank = 8 # Optimize this based on previous collaborative filtering - myModel = ALS.train(trainingWithMyRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter) +print("Predicted count: %d" % predictedTestRDD.count()) +print("Test count: %d" % testRDD.count()) - # 4.4 Evaluate RMSE on the test set - testPairs = testRDD.map(lambda x: (x[0], x[1])) - predictedTestMyRDD = myModel.predictAll(testPairs) - testRMSE = computeError(predictedTestMyRDD, testRDD) - print('Test RMSE with personal ratings: %s' % testRMSE) +try: + testRMSE = computeError(predictedTestRDD, testRDD) + print("Test RMSE: %f" % testRMSE) +except Exception as e: + print("Error computing test RMSE:", str(e)) - # 4.5 Predict ratings for unrated movies - ratedMovieIDs = [m[1] for m in myRatings] - allMovieIDs = moviesRDD.map(lambda x: x[0]).collect() - unrated = [(myUserID, mid) for mid in allMovieIDs if mid not in ratedMovieIDs] - unratedRDD = sc.parallelize(unrated) - predictedUnratedRDD = myModel.predictAll(unratedRDD) - - # 4.6 Filter movies with more than 75 ratings and output top 25 recommendations - counts = ratingsRDD.map(lambda x: (x[1], 1)).countByKey() - candidates = predictedUnratedRDD.filter(lambda x: counts.get(x[1], 0) > 75) - # Map to (predictedRating, movieID) then join with movie titles - recs = candidates.map(lambda x: (x[2], x[1])) \ - .join(moviesRDD.map(lambda x: (x[0], x[1]))) \ - .map(lambda x: x[1]) \ - .takeOrdered(25, key=lambda x: -x[0]) - - # Print top recommendations - print('Top 25 movie recommendations for you:') - for rating, title in recs: - print('%s: predicted rating %.2f' % (title, rating)) - - sc.stop() \ No newline at end of file +# Stop the Spark context +sc.stop()