from pyspark import SparkContext, SparkConf from pyspark.mllib.recommendation import ALS import data_prepare import math # Compute RMSE between predicted and actual ratings def computeError(predictedRDD, actualRDD): predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD) \ .map(lambda pre: (pre[1][0] - pre[1][1]) ** 2) totalError = squaredErrorsRDD.reduce(lambda a, b: a + b) numRatings = squaredErrorsRDD.count() return math.sqrt(float(totalError) / numRatings) if __name__ == "__main__": import sys, os os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" # Initialize Spark context conf = SparkConf().setMaster("spark://master:7077").setAppName("self_recommend") sc = SparkContext.getOrCreate(conf) sc.setLogLevel("ERROR") sc.addPyFile("data_prepare.py") # Load ratings and movie data ratingsRDD, moviesRDD = data_prepare.init_rdds(sc) # Split data into training, validation, and test sets trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2) # 4.1 Add personal ratings (userID=0) myUserID = 0 myRatings = [ (myUserID, 1, 4.0), (myUserID, 2, 5.0), (myUserID, 3, 3.0), (myUserID, 10, 4.5), (myUserID, 20, 2.0), (myUserID, 30, 5.0), (myUserID, 40, 4.0), (myUserID, 50, 3.5), (myUserID, 60, 4.0), (myUserID, 70, 4.5) ] myRatingsRDD = sc.parallelize(myRatings) # 4.2 Combine personal ratings with training data trainingWithMyRDD = trainingRDD.union(myRatingsRDD) # 4.3 Train model with the same parameters seed = 2 iterations = 5 regularizationParameter = 0.1 bestRank = 8 # Optimize this based on previous collaborative filtering myModel = ALS.train(trainingWithMyRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter) # 4.4 Evaluate RMSE on the test set testPairs = testRDD.map(lambda x: (x[0], x[1])) predictedTestMyRDD = myModel.predictAll(testPairs) testRMSE = computeError(predictedTestMyRDD, testRDD) print('Test RMSE with personal ratings: %s' % testRMSE) # 4.5 Predict ratings for unrated movies ratedMovieIDs = [m[1] for m in myRatings] allMovieIDs = moviesRDD.map(lambda x: x[0]).collect() unrated = [(myUserID, mid) for mid in allMovieIDs if mid not in ratedMovieIDs] unratedRDD = sc.parallelize(unrated) predictedUnratedRDD = myModel.predictAll(unratedRDD) # 4.6 Filter movies with more than 75 ratings and output top 25 recommendations counts = ratingsRDD.map(lambda x: (x[1], 1)).countByKey() candidates = predictedUnratedRDD.filter(lambda x: counts.get(x[1], 0) > 75) # Map to (predictedRating, movieID) then join with movie titles recs = candidates.map(lambda x: (x[2], x[1])) \ .join(moviesRDD.map(lambda x: (x[0], x[1]))) \ .map(lambda x: x[1]) \ .takeOrdered(25, key=lambda x: -x[0]) # Print top recommendations print('Top 25 movie recommendations for you:') for rating, title in recs: print('%s: predicted rating %.2f' % (title, rating)) sc.stop()