als_movie/collab_filter.py
fly6516 2e56c9d39f feat(collab_filter): 添加协同过滤电影推荐功能
- 新增 collab_filter.py 文件实现电影推荐系统
- 使用 ALS 算法进行协同过滤
- 优化模型参数,找到最佳 rank
-计算训练集和测试集的 RMSE
- 与平均评分进行对比,验证模型效果
2025-04-22 14:22:40 +08:00

78 lines
3.1 KiB
Python

from pyspark import SparkContext, SparkConf
import data_prepare
import basic_re
import math
from test_helper import Test
from pyspark.mllib.recommendation import ALS
def computeError(predictedRDD, actualRDD):
predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
squaredErrorsRDD = (predictedReformattedRDD
.join(actualReformattedRDD)
.map(lambda pre: (pre[1][0] - pre[1][1]) ** 2))
totalError = squaredErrorsRDD.reduce(lambda a, b: a + b)
numRatings = squaredErrorsRDD.count()
return math.sqrt(float(totalError) / numRatings)
if __name__ == "__main__":
import sys, os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
sc.addPyFile("data_prepare.py")
sc.addPyFile("basic_re.py")
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2)
print('Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
validationRDD.count(),
testRDD.count()))
print(trainingRDD.take(3))
print(validationRDD.take(3))
print(testRDD.take(3))
print(validationRDD.count())
validationForPredictRDD = validationRDD.map(lambda movie: (movie[0], movie[1]))
seed = 2
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.1
minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
lambda_=regularizationParameter)
predictedRatingsRDD = model.predictAll(validationForPredictRDD)
error = computeError(predictedRatingsRDD, validationRDD)
errors[err] = error
err += 1
print('For rank %s the RMSE is %s' % (rank, error))
if error < minError:
minError = error
bestRank = rank
print('The best model was trained with rank %s' % bestRank)
myModel = ALS.train(trainingRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda movie: (movie[0], movie[1]))
predictedTestRDD = myModel.predictAll(testForPredictingRDD)
testRMSE = computeError(testRDD, predictedTestRDD)
print('The model had a RMSE on the test set of %s' % testRMSE)
trainingAvgRating = trainingRDD.map(lambda x: x[2]).mean()
print('The average rating for movies in the training set is %s' % trainingAvgRating)
testForAvgRDD = testRDD.map(lambda x: (x[0], x[1], trainingAvgRating))
testAvgRMSE = computeError(testRDD, testForAvgRDD)
print('The RMSE on the average set is %s' % testAvgRMSE)
sc.stop()