diff --git a/self_recommand.py b/self_recommand.py index ba71d60..56fd4f7 100644 --- a/self_recommand.py +++ b/self_recommand.py @@ -1,74 +1,79 @@ -from pyspark import SparkContext -from pyspark.mllib.recommendation import ALS, Rating +from pyspark import SparkContext, SparkConf +from pyspark.mllib.recommendation import ALS +import data_prepare import math -# Initialize Spark context -sc = SparkContext(appName="MovieLensALS") - -# Load ratings data: format (userId, movieId, rating) -rawData = sc.textFile("hdfs://master:9000/user/root/ratings.csv") -header = rawData.first() -rawData = rawData.filter(lambda line: line != header) - -# Convert to Rating objects and filter incomplete lines -ratingsRDD = rawData.map(lambda line: line.split(",")) \ - .filter(lambda fields: len(fields) >= 3) \ - .map(lambda fields: Rating(int(fields[0]), int(fields[1]), float(fields[2]))) - -# Split data into training, validation, and test sets (60%, 20%, 20%) -trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0) -print("Training: %d, validation: %d, test: %d" % (trainingRDD.count(), validationRDD.count(), testRDD.count())) - -# Set ALS model parameters -rank = 10 -numIterations = 10 -lambda_ = 0.1 - -# Train ALS model on the training data -myModel = ALS.train(trainingRDD, rank, iterations=numIterations, lambda_=lambda_) - -# Function to compute RMSE between predicted and actual ratings +# Compute RMSE between predicted and actual ratings def computeError(predictedRDD, actualRDD): - predictedReformattedRDD = predictedRDD.filter(lambda r: len(r) == 3) \ - .map(lambda r: ((r[0], r[1]), r[2])) - actualReformattedRDD = actualRDD.filter(lambda r: len(r) == 3) \ - .map(lambda r: ((r[0], r[1]), r[2])) - joined = predictedReformattedRDD.join(actualReformattedRDD) - squaredErrorsRDD = joined.map(lambda pair: (pair[1][0] - pair[1][1]) ** 2) + predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) + actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) + squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD) \ + .map(lambda pre: (pre[1][0] - pre[1][1]) ** 2) totalError = squaredErrorsRDD.reduce(lambda a, b: a + b) numRatings = squaredErrorsRDD.count() - return math.sqrt(float(totalError) / numRatings) if numRatings != 0 else float('nan') + return math.sqrt(float(totalError) / numRatings) -# Predict and evaluate on the validation set -validationPairs = validationRDD.map(lambda rating: (rating[0], rating[1])) -predictedValRDD = myModel.predictAll(validationPairs) +if __name__ == "__main__": + import sys, os + os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" + os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" -print("\nSample validation predictions:") -print(predictedValRDD.take(3)) -print(validationRDD.take(3)) + # Initialize Spark context + conf = SparkConf().setMaster("spark://master:7077").setAppName("self_recommend") + sc = SparkContext.getOrCreate(conf) + sc.setLogLevel("ERROR") + sc.addPyFile("data_prepare.py") -try: - validationRMSE = computeError(predictedValRDD, validationRDD) - print("Validation RMSE: %f" % validationRMSE) -except Exception as e: - print("Error computing validation RMSE:", str(e)) + # Load ratings and movie data + ratingsRDD, moviesRDD = data_prepare.init_rdds(sc) + # Split data into training, validation, and test sets + trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2) -# Predict and evaluate on the test set -testPairs = testRDD.map(lambda rating: (rating[0], rating[1])) -predictedTestRDD = myModel.predictAll(testPairs) + # 4.1 Add personal ratings (userID=0) + myUserID = 0 + myRatings = [ + (myUserID, 1, 4.0), (myUserID, 2, 5.0), (myUserID, 3, 3.0), + (myUserID, 10, 4.5), (myUserID, 20, 2.0), (myUserID, 30, 5.0), + (myUserID, 40, 4.0), (myUserID, 50, 3.5), (myUserID, 60, 4.0), + (myUserID, 70, 4.5) + ] + myRatingsRDD = sc.parallelize(myRatings) -print("\nSample test predictions:") -print(predictedTestRDD.take(3)) -print(testRDD.take(3)) + # 4.2 Combine personal ratings with training data + trainingWithMyRDD = trainingRDD.union(myRatingsRDD) -print("Predicted count: %d" % predictedTestRDD.count()) -print("Test count: %d" % testRDD.count()) + # 4.3 Train model with the same parameters + seed = 2 + iterations = 5 + regularizationParameter = 0.1 + bestRank = 8 # Optimize this based on previous collaborative filtering + myModel = ALS.train(trainingWithMyRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter) -try: - testRMSE = computeError(predictedTestRDD, testRDD) - print("Test RMSE: %f" % testRMSE) -except Exception as e: - print("Error computing test RMSE:", str(e)) + # 4.4 Evaluate RMSE on the test set + testPairs = testRDD.map(lambda x: (x[0], x[1])) + predictedTestMyRDD = myModel.predictAll(testPairs) + testRMSE = computeError(predictedTestMyRDD, testRDD) + print('Test RMSE with personal ratings: %s' % testRMSE) -# Stop the Spark context -sc.stop() + # 4.5 Predict ratings for unrated movies + ratedMovieIDs = [m[1] for m in myRatings] + allMovieIDs = moviesRDD.map(lambda x: x[0]).collect() + unrated = [(myUserID, mid) for mid in allMovieIDs if mid not in ratedMovieIDs] + unratedRDD = sc.parallelize(unrated) + predictedUnratedRDD = myModel.predictAll(unratedRDD) + + # 4.6 Filter movies with more than 75 ratings and output top 25 recommendations + counts = ratingsRDD.map(lambda x: (x[1], 1)).countByKey() + candidates = predictedUnratedRDD.filter(lambda x: counts.get(x[1], 0) > 75) + # Map to (predictedRating, movieID) then join with movie titles + recs = candidates.map(lambda x: (x[2], x[1])) \ + .join(moviesRDD.map(lambda x: (x[0], x[1]))) \ + .map(lambda x: x[1]) \ + .takeOrdered(25, key=lambda x: -x[0]) + + # Print top recommendations + print('Top 25 movie recommendations for you:') + for rating, title in recs: + print('%s: predicted rating %.2f' % (title, rating)) + + sc.stop() \ No newline at end of file