from pyspark import SparkContext, SparkConf import data_prepare from test_helper import Test def getCountsAndAverages(IDandRatingsTuple): movie = IDandRatingsTuple[0] ratings = IDandRatingsTuple[1] return (movie, (len(ratings), float(sum(ratings)) / len(ratings))) if __name__ == "__main__": import sys, os os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") sc = SparkContext.getOrCreate(conf) sc.setLogLevel("ERROR") sc.addPyFile("data_prepare.py") ratingsRDD, moviesRDD = data_prepare.init_rdds(sc) movieIDsWithRatingsRDD = (ratingsRDD .map(lambda x: (x[1], x[2])) .groupByKey()) print('movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3)) movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec)) print('movieIDsWithAvgRatingsRDD1: %s\n' % movieIDsWithAvgRatingsRDD.take(3)) movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD) .map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0]))) print('movieNameWithAvgRatingsRDD2: %s\n' % movieNameWithAvgRatingsRDD.take(3)) movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD .filter(lambda movie: movie[2] > 500) .sortBy(data_prepare.sortFunction, False)) print('Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20)) sc.stop()