feat(collab_filter): 添加协同过滤电影推荐功能
- 新增 collab_filter.py 文件实现电影推荐系统 - 使用 ALS 算法进行协同过滤 - 优化模型参数,找到最佳 rank -计算训练集和测试集的 RMSE - 与平均评分进行对比,验证模型效果
This commit is contained in:
parent
90d4056157
commit
2e56c9d39f
78
collab_filter.py
Normal file
78
collab_filter.py
Normal file
@ -0,0 +1,78 @@
|
||||
from pyspark import SparkContext, SparkConf
|
||||
import data_prepare
|
||||
import basic_re
|
||||
import math
|
||||
from test_helper import Test
|
||||
from pyspark.mllib.recommendation import ALS
|
||||
|
||||
|
||||
def computeError(predictedRDD, actualRDD):
|
||||
predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
|
||||
actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2]))
|
||||
squaredErrorsRDD = (predictedReformattedRDD
|
||||
.join(actualReformattedRDD)
|
||||
.map(lambda pre: (pre[1][0] - pre[1][1]) ** 2))
|
||||
totalError = squaredErrorsRDD.reduce(lambda a, b: a + b)
|
||||
numRatings = squaredErrorsRDD.count()
|
||||
return math.sqrt(float(totalError) / numRatings)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys, os
|
||||
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
|
||||
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
|
||||
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
|
||||
sc = SparkContext.getOrCreate(conf)
|
||||
sc.setLogLevel("ERROR")
|
||||
sc.addPyFile("data_prepare.py")
|
||||
sc.addPyFile("basic_re.py")
|
||||
|
||||
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
|
||||
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2)
|
||||
print('Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
|
||||
validationRDD.count(),
|
||||
testRDD.count()))
|
||||
print(trainingRDD.take(3))
|
||||
print(validationRDD.take(3))
|
||||
print(testRDD.take(3))
|
||||
print(validationRDD.count())
|
||||
|
||||
validationForPredictRDD = validationRDD.map(lambda movie: (movie[0], movie[1]))
|
||||
|
||||
seed = 2
|
||||
iterations = 5
|
||||
regularizationParameter = 0.1
|
||||
ranks = [4, 8, 12]
|
||||
errors = [0, 0, 0]
|
||||
err = 0
|
||||
tolerance = 0.1
|
||||
|
||||
minError = float('inf')
|
||||
bestRank = -1
|
||||
bestIteration = -1
|
||||
for rank in ranks:
|
||||
model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
|
||||
lambda_=regularizationParameter)
|
||||
predictedRatingsRDD = model.predictAll(validationForPredictRDD)
|
||||
error = computeError(predictedRatingsRDD, validationRDD)
|
||||
errors[err] = error
|
||||
err += 1
|
||||
print('For rank %s the RMSE is %s' % (rank, error))
|
||||
if error < minError:
|
||||
minError = error
|
||||
bestRank = rank
|
||||
|
||||
print('The best model was trained with rank %s' % bestRank)
|
||||
|
||||
myModel = ALS.train(trainingRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
|
||||
testForPredictingRDD = testRDD.map(lambda movie: (movie[0], movie[1]))
|
||||
predictedTestRDD = myModel.predictAll(testForPredictingRDD)
|
||||
testRMSE = computeError(testRDD, predictedTestRDD)
|
||||
print('The model had a RMSE on the test set of %s' % testRMSE)
|
||||
|
||||
trainingAvgRating = trainingRDD.map(lambda x: x[2]).mean()
|
||||
print('The average rating for movies in the training set is %s' % trainingAvgRating)
|
||||
testForAvgRDD = testRDD.map(lambda x: (x[0], x[1], trainingAvgRating))
|
||||
testAvgRMSE = computeError(testRDD, testForAvgRDD)
|
||||
print('The RMSE on the average set is %s' % testAvgRMSE)
|
||||
|
||||
sc.stop()
|
138
error.txt
138
error.txt
@ -1,138 +0,0 @@
|
||||
/usr/bin/python3 /root/PycharmProjects/als_movie/basic_re.py
|
||||
25/04/22 06:11:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
|
||||
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
|
||||
Setting default log level to "WARN".
|
||||
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
|
||||
[Stage 1:> (0 + 2) / 2]25/04/22 06:11:34 ERROR TaskSetManager: Task 1 in stage 1.0 failed 4 times; aborting job
|
||||
Traceback (most recent call last):
|
||||
File "/root/PycharmProjects/als_movie/basic_re.py", line 22, in <module>
|
||||
print('movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3))
|
||||
File "/usr/local/bin/python3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 1360, in take
|
||||
res = self.context.runJob(self, takeUpToNumLeft, p)
|
||||
File "/usr/local/bin/python3.6/lib/python3.6/site-packages/pyspark/context.py", line 1069, in runJob
|
||||
sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
|
||||
File "/usr/local/bin/python3.6/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
|
||||
answer, self.gateway_client, self.target_id, self.name)
|
||||
File "/usr/local/bin/python3.6/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
|
||||
format(target_id, ".", name), value)
|
||||
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
|
||||
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 6, 100.64.0.11, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
|
||||
process()
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
|
||||
serializer.dump_stream(func(split_index, iterator), outfile)
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
|
||||
vs = list(itertools.islice(iterator, batch))
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
|
||||
return f(*args, **kwargs)
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/work/app-20250422061114-0009/2/data_prepare.py", line 11, in get_ratings_tuple
|
||||
user, movie, rating, _ = entry.split('::')
|
||||
ValueError: too many values to unpack (expected 4)
|
||||
|
||||
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
|
||||
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
|
||||
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
|
||||
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
|
||||
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
|
||||
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
|
||||
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
|
||||
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
|
||||
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
|
||||
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
|
||||
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
|
||||
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
|
||||
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
|
||||
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
|
||||
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
|
||||
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
|
||||
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
|
||||
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:103)
|
||||
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
|
||||
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
|
||||
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
|
||||
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
|
||||
at org.apache.spark.scheduler.Task.run(Task.scala:123)
|
||||
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
|
||||
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
|
||||
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
|
||||
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
|
||||
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
|
||||
at java.lang.Thread.run(Thread.java:748)
|
||||
|
||||
Driver stacktrace:
|
||||
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
|
||||
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
|
||||
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
|
||||
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
|
||||
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
|
||||
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
|
||||
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
|
||||
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
|
||||
at scala.Option.foreach(Option.scala:257)
|
||||
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
|
||||
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
|
||||
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
|
||||
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
|
||||
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
|
||||
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
|
||||
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
|
||||
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088)
|
||||
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107)
|
||||
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
|
||||
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
|
||||
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
|
||||
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
|
||||
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
|
||||
at java.lang.reflect.Method.invoke(Method.java:498)
|
||||
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
|
||||
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
|
||||
at py4j.Gateway.invoke(Gateway.java:282)
|
||||
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
|
||||
at py4j.commands.CallCommand.execute(CallCommand.java:79)
|
||||
at py4j.GatewayConnection.run(GatewayConnection.java:238)
|
||||
at java.lang.Thread.run(Thread.java:748)
|
||||
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
|
||||
process()
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
|
||||
serializer.dump_stream(func(split_index, iterator), outfile)
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
|
||||
vs = list(itertools.islice(iterator, batch))
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
|
||||
return f(*args, **kwargs)
|
||||
File "/opt/module/spark-2.4.8-bin-hadoop2.7/work/app-20250422061114-0009/2/data_prepare.py", line 11, in get_ratings_tuple
|
||||
user, movie, rating, _ = entry.split('::')
|
||||
ValueError: too many values to unpack (expected 4)
|
||||
|
||||
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
|
||||
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
|
||||
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
|
||||
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
|
||||
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
|
||||
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
|
||||
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
|
||||
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
|
||||
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
|
||||
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
|
||||
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
|
||||
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
|
||||
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
|
||||
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
|
||||
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
|
||||
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
|
||||
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
|
||||
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:103)
|
||||
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
|
||||
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
|
||||
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
|
||||
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
|
||||
at org.apache.spark.scheduler.Task.run(Task.scala:123)
|
||||
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
|
||||
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
|
||||
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
|
||||
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
|
||||
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
|
||||
... 1 more
|
||||
|
||||
|
||||
Process finished with exit code 1
|
Loading…
Reference in New Issue
Block a user