from pyspark import SparkContext, SparkConf import data_prepare import basic_re import math from test_helper import Test from pyspark.mllib.recommendation import ALS def computeError(predictedRDD, actualRDD): predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) squaredErrorsRDD = (predictedReformattedRDD .join(actualReformattedRDD) .map(lambda pre: (pre[1][0] - pre[1][1]) ** 2)) totalError = squaredErrorsRDD.reduce(lambda a, b: a + b) numRatings = squaredErrorsRDD.count() return math.sqrt(float(totalError) / numRatings) if __name__ == "__main__": import sys, os os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") sc = SparkContext.getOrCreate(conf) sc.setLogLevel("ERROR") sc.addPyFile("data_prepare.py") sc.addPyFile("basic_re.py") ratingsRDD, moviesRDD = data_prepare.init_rdds(sc) trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2) print('Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(), validationRDD.count(), testRDD.count())) print(trainingRDD.take(3)) print(validationRDD.take(3)) print(testRDD.take(3)) print(validationRDD.count()) validationForPredictRDD = validationRDD.map(lambda movie: (movie[0], movie[1])) seed = 2 iterations = 5 regularizationParameter = 0.1 ranks = [4, 8, 12] errors = [0, 0, 0] err = 0 tolerance = 0.1 minError = float('inf') bestRank = -1 bestIteration = -1 for rank in ranks: model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations, lambda_=regularizationParameter) predictedRatingsRDD = model.predictAll(validationForPredictRDD) error = computeError(predictedRatingsRDD, validationRDD) errors[err] = error err += 1 print('For rank %s the RMSE is %s' % (rank, error)) if error < minError: minError = error bestRank = rank print('The best model was trained with rank %s' % bestRank) myModel = ALS.train(trainingRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter) testForPredictingRDD = testRDD.map(lambda movie: (movie[0], movie[1])) predictedTestRDD = myModel.predictAll(testForPredictingRDD) testRMSE = computeError(testRDD, predictedTestRDD) print('The model had a RMSE on the test set of %s' % testRMSE) trainingAvgRating = trainingRDD.map(lambda x: x[2]).mean() print('The average rating for movies in the training set is %s' % trainingAvgRating) testForAvgRDD = testRDD.map(lambda x: (x[0], x[1], trainingAvgRating)) testAvgRMSE = computeError(testRDD, testForAvgRDD) print('The RMSE on the average set is %s' % testAvgRMSE) sc.stop()