- 新增 collab_filter.py 文件实现电影推荐系统 - 使用 ALS 算法进行协同过滤 - 优化模型参数,找到最佳 rank -计算训练集和测试集的 RMSE - 与平均评分进行对比,验证模型效果
78 lines
3.1 KiB
Python
78 lines
3.1 KiB
Python
from pyspark import SparkContext, SparkConf
|
|
import data_prepare
|
|
import basic_re
|
|
import math
|
|
from test_helper import Test
|
|
from pyspark.mllib.recommendation import ALS
|
|
|
|
|
|
def computeError(predictedRDD, actualRDD):
|
|
predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
|
|
actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
|
|
squaredErrorsRDD = (predictedReformattedRDD
|
|
.join(actualReformattedRDD)
|
|
.map(lambda pre: (pre[1][0] - pre[1][1]) ** 2))
|
|
totalError = squaredErrorsRDD.reduce(lambda a, b: a + b)
|
|
numRatings = squaredErrorsRDD.count()
|
|
return math.sqrt(float(totalError) / numRatings)
|
|
|
|
if __name__ == "__main__":
|
|
import sys, os
|
|
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
|
|
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
|
|
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
|
|
sc = SparkContext.getOrCreate(conf)
|
|
sc.setLogLevel("ERROR")
|
|
sc.addPyFile("data_prepare.py")
|
|
sc.addPyFile("basic_re.py")
|
|
|
|
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
|
|
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2)
|
|
print('Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
|
|
validationRDD.count(),
|
|
testRDD.count()))
|
|
print(trainingRDD.take(3))
|
|
print(validationRDD.take(3))
|
|
print(testRDD.take(3))
|
|
print(validationRDD.count())
|
|
|
|
validationForPredictRDD = validationRDD.map(lambda movie: (movie[0], movie[1]))
|
|
|
|
seed = 2
|
|
iterations = 5
|
|
regularizationParameter = 0.1
|
|
ranks = [4, 8, 12]
|
|
errors = [0, 0, 0]
|
|
err = 0
|
|
tolerance = 0.1
|
|
|
|
minError = float('inf')
|
|
bestRank = -1
|
|
bestIteration = -1
|
|
for rank in ranks:
|
|
model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
|
|
lambda_=regularizationParameter)
|
|
predictedRatingsRDD = model.predictAll(validationForPredictRDD)
|
|
error = computeError(predictedRatingsRDD, validationRDD)
|
|
errors[err] = error
|
|
err += 1
|
|
print('For rank %s the RMSE is %s' % (rank, error))
|
|
if error < minError:
|
|
minError = error
|
|
bestRank = rank
|
|
|
|
print('The best model was trained with rank %s' % bestRank)
|
|
|
|
myModel = ALS.train(trainingRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
|
|
testForPredictingRDD = testRDD.map(lambda movie: (movie[0], movie[1]))
|
|
predictedTestRDD = myModel.predictAll(testForPredictingRDD)
|
|
testRMSE = computeError(testRDD, predictedTestRDD)
|
|
print('The model had a RMSE on the test set of %s' % testRMSE)
|
|
|
|
trainingAvgRating = trainingRDD.map(lambda x: x[2]).mean()
|
|
print('The average rating for movies in the training set is %s' % trainingAvgRating)
|
|
testForAvgRDD = testRDD.map(lambda x: (x[0], x[1], trainingAvgRating))
|
|
testAvgRMSE = computeError(testRDD, testForAvgRDD)
|
|
print('The RMSE on the average set is %s' % testAvgRMSE)
|
|
|
|
sc.stop() |