from pyspark import SparkContext from pyspark.mllib.recommendation import ALS, Rating import math # Initialize Spark context sc = SparkContext(appName="MovieLensALS") # Load ratings data: format (userId, movieId, rating) rawData = sc.textFile("hdfs://master:9000/user/root/ratings.csv") header = rawData.first() rawData = rawData.filter(lambda line: line != header) # Convert to Rating objects and filter incomplete lines ratingsRDD = rawData.map(lambda line: line.split(",")) \ .filter(lambda fields: len(fields) >= 3) \ .map(lambda fields: Rating(int(fields[0]), int(fields[1]), float(fields[2]))) # Split data into training, validation, and test sets (60%, 20%, 20%) trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0) print("Training: %d, validation: %d, test: %d" % (trainingRDD.count(), validationRDD.count(), testRDD.count())) # Set ALS model parameters rank = 10 numIterations = 10 lambda_ = 0.1 # Train ALS model on the training data myModel = ALS.train(trainingRDD, rank, iterations=numIterations, lambda_=lambda_) # Function to compute RMSE between predicted and actual ratings def computeError(predictedRDD, actualRDD): predictedReformattedRDD = predictedRDD.filter(lambda r: len(r) == 3) \ .map(lambda r: ((r[0], r[1]), r[2])) actualReformattedRDD = actualRDD.filter(lambda r: len(r) == 3) \ .map(lambda r: ((r[0], r[1]), r[2])) joined = predictedReformattedRDD.join(actualReformattedRDD) squaredErrorsRDD = joined.map(lambda pair: (pair[1][0] - pair[1][1]) ** 2) totalError = squaredErrorsRDD.reduce(lambda a, b: a + b) numRatings = squaredErrorsRDD.count() return math.sqrt(float(totalError) / numRatings) if numRatings != 0 else float('nan') # Predict and evaluate on the validation set validationPairs = validationRDD.map(lambda rating: (rating[0], rating[1])) predictedValRDD = myModel.predictAll(validationPairs) print("\nSample validation predictions:") print(predictedValRDD.take(3)) print(validationRDD.take(3)) try: validationRMSE = computeError(predictedValRDD, validationRDD) print("Validation RMSE: %f" % validationRMSE) except Exception as e: print("Error computing validation RMSE:", str(e)) # Predict and evaluate on the test set testPairs = testRDD.map(lambda rating: (rating[0], rating[1])) predictedTestRDD = myModel.predictAll(testPairs) print("\nSample test predictions:") print(predictedTestRDD.take(3)) print(testRDD.take(3)) print("Predicted count: %d" % predictedTestRDD.count()) print("Test count: %d" % testRDD.count()) try: testRMSE = computeError(predictedTestRDD, testRDD) print("Test RMSE: %f" % testRMSE) except Exception as e: print("Error computing test RMSE:", str(e)) # Stop the Spark context sc.stop()