35 lines
1.6 KiB
Python
35 lines
1.6 KiB
Python
from pyspark import SparkContext, SparkConf
|
|
import data_prepare
|
|
from test_helper import Test
|
|
def getCountsAndAverages(IDandRatingsTuple):
|
|
movie = IDandRatingsTuple[0]
|
|
ratings = IDandRatingsTuple[1]
|
|
return (movie, (len(ratings), float(sum(ratings)) / len(ratings)))
|
|
|
|
if __name__ == "__main__":
|
|
import sys, os
|
|
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
|
|
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
|
|
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
|
|
sc = SparkContext.getOrCreate(conf)
|
|
sc.setLogLevel("ERROR")
|
|
sc.addPyFile("data_prepare.py")
|
|
|
|
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
|
|
movieIDsWithRatingsRDD = (ratingsRDD
|
|
.map(lambda x: (x[1], x[2]))
|
|
.groupByKey())
|
|
print('movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3))
|
|
|
|
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec))
|
|
print('movieIDsWithAvgRatingsRDD1: %s\n' % movieIDsWithAvgRatingsRDD.take(3))
|
|
|
|
movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD)
|
|
.map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0])))
|
|
print('movieNameWithAvgRatingsRDD2: %s\n' % movieNameWithAvgRatingsRDD.take(3))
|
|
|
|
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
|
|
.filter(lambda movie: movie[2] > 500)
|
|
.sortBy(data_prepare.sortFunction, False))
|
|
print('Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20))
|
|
sc.stop() |