From 2e56c9d39f1d5abb55cd89e98a0cad45f5c55f53 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Tue, 22 Apr 2025 14:22:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(collab=5Ffilter):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=8D=8F=E5=90=8C=E8=BF=87=E6=BB=A4=E7=94=B5=E5=BD=B1=E6=8E=A8?= =?UTF-8?q?=E8=8D=90=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 collab_filter.py 文件实现电影推荐系统 - 使用 ALS 算法进行协同过滤 - 优化模型参数,找到最佳 rank -计算训练集和测试集的 RMSE - 与平均评分进行对比,验证模型效果 --- collab_filter.py | 78 +++++++++++++++++++++++++++ error.txt | 138 ----------------------------------------------- 2 files changed, 78 insertions(+), 138 deletions(-) create mode 100644 collab_filter.py diff --git a/collab_filter.py b/collab_filter.py new file mode 100644 index 0000000..f4eabb1 --- /dev/null +++ b/collab_filter.py @@ -0,0 +1,78 @@ +from pyspark import SparkContext, SparkConf +import data_prepare +import basic_re +import math +from test_helper import Test +from pyspark.mllib.recommendation import ALS + + +def computeError(predictedRDD, actualRDD): + predictedReformattedRDD = predictedRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) + actualReformattedRDD = actualRDD.map(lambda movie: ((movie[0], movie[1]), movie[2])) + squaredErrorsRDD = (predictedReformattedRDD + .join(actualReformattedRDD) + .map(lambda pre: (pre[1][0] - pre[1][1]) ** 2)) + totalError = squaredErrorsRDD.reduce(lambda a, b: a + b) + numRatings = squaredErrorsRDD.count() + return math.sqrt(float(totalError) / numRatings) + +if __name__ == "__main__": + import sys, os + os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" + os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" + conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") + sc = SparkContext.getOrCreate(conf) + sc.setLogLevel("ERROR") + sc.addPyFile("data_prepare.py") + sc.addPyFile("basic_re.py") + + ratingsRDD, moviesRDD = data_prepare.init_rdds(sc) + trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=2) + print('Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(), + validationRDD.count(), + testRDD.count())) + print(trainingRDD.take(3)) + print(validationRDD.take(3)) + print(testRDD.take(3)) + print(validationRDD.count()) + + validationForPredictRDD = validationRDD.map(lambda movie: (movie[0], movie[1])) + + seed = 2 + iterations = 5 + regularizationParameter = 0.1 + ranks = [4, 8, 12] + errors = [0, 0, 0] + err = 0 + tolerance = 0.1 + + minError = float('inf') + bestRank = -1 + bestIteration = -1 + for rank in ranks: + model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations, + lambda_=regularizationParameter) + predictedRatingsRDD = model.predictAll(validationForPredictRDD) + error = computeError(predictedRatingsRDD, validationRDD) + errors[err] = error + err += 1 + print('For rank %s the RMSE is %s' % (rank, error)) + if error < minError: + minError = error + bestRank = rank + + print('The best model was trained with rank %s' % bestRank) + + myModel = ALS.train(trainingRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter) + testForPredictingRDD = testRDD.map(lambda movie: (movie[0], movie[1])) + predictedTestRDD = myModel.predictAll(testForPredictingRDD) + testRMSE = computeError(testRDD, predictedTestRDD) + print('The model had a RMSE on the test set of %s' % testRMSE) + + trainingAvgRating = trainingRDD.map(lambda x: x[2]).mean() + print('The average rating for movies in the training set is %s' % trainingAvgRating) + testForAvgRDD = testRDD.map(lambda x: (x[0], x[1], trainingAvgRating)) + testAvgRMSE = computeError(testRDD, testForAvgRDD) + print('The RMSE on the average set is %s' % testAvgRMSE) + + sc.stop() \ No newline at end of file diff --git a/error.txt b/error.txt index 982ce94..e69de29 100644 --- a/error.txt +++ b/error.txt @@ -1,138 +0,0 @@ -/usr/bin/python3 /root/PycharmProjects/als_movie/basic_re.py -25/04/22 06:11:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable -Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties -Setting default log level to "WARN". -To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). -[Stage 1:> (0 + 2) / 2]25/04/22 06:11:34 ERROR TaskSetManager: Task 1 in stage 1.0 failed 4 times; aborting job -Traceback (most recent call last): - File "/root/PycharmProjects/als_movie/basic_re.py", line 22, in - print('movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3)) - File "/usr/local/bin/python3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 1360, in take - res = self.context.runJob(self, takeUpToNumLeft, p) - File "/usr/local/bin/python3.6/lib/python3.6/site-packages/pyspark/context.py", line 1069, in runJob - sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) - File "/usr/local/bin/python3.6/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__ - answer, self.gateway_client, self.target_id, self.name) - File "/usr/local/bin/python3.6/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value - format(target_id, ".", name), value) -py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. -: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 6, 100.64.0.11, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): - File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main - process() - File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process - serializer.dump_stream(func(split_index, iterator), outfile) - File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream - vs = list(itertools.islice(iterator, batch)) - File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper - return f(*args, **kwargs) - File "/opt/module/spark-2.4.8-bin-hadoop2.7/work/app-20250422061114-0009/2/data_prepare.py", line 11, in get_ratings_tuple - user, movie, rating, _ = entry.split('::') -ValueError: too many values to unpack (expected 4) - - at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456) - at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592) - at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) - at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) - at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) - at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) - at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349) - at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) - at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) - at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) - at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) - at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) - at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) - at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) - at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65) - at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) - at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) - at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:103) - at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) - at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) - at org.apache.spark.scheduler.Task.run(Task.scala:123) - at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411) - at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) - at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417) - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) - at java.lang.Thread.run(Thread.java:748) - -Driver stacktrace: - at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925) - at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913) - at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912) - at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) - at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) - at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912) - at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) - at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) - at scala.Option.foreach(Option.scala:257) - at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948) - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146) - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095) - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084) - at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) - at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088) - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107) - at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153) - at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) - at java.lang.reflect.Method.invoke(Method.java:498) - at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) - at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) - at py4j.Gateway.invoke(Gateway.java:282) - at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) - at py4j.commands.CallCommand.execute(CallCommand.java:79) - at py4j.GatewayConnection.run(GatewayConnection.java:238) - at java.lang.Thread.run(Thread.java:748) -Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): - File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main - process() - File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process - serializer.dump_stream(func(split_index, iterator), outfile) - File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream - vs = list(itertools.islice(iterator, batch)) - File "/opt/module/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper - return f(*args, **kwargs) - File "/opt/module/spark-2.4.8-bin-hadoop2.7/work/app-20250422061114-0009/2/data_prepare.py", line 11, in get_ratings_tuple - user, movie, rating, _ = entry.split('::') -ValueError: too many values to unpack (expected 4) - - at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456) - at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592) - at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) - at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) - at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) - at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) - at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349) - at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) - at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) - at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) - at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) - at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) - at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) - at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) - at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65) - at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) - at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) - at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:103) - at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) - at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) - at org.apache.spark.scheduler.Task.run(Task.scala:123) - at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411) - at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) - at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417) - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) - ... 1 more - - -Process finished with exit code 1