diff --git a/basic_re.py b/basic_re.py index 0e291b3..fccca8e 100644 --- a/basic_re.py +++ b/basic_re.py @@ -1,68 +1,35 @@ from pyspark import SparkContext, SparkConf -import os -os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171" -os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" -os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3" -def get_ratings_tuple(entry): - items = entry.split('::') - return int(items[0]), int(items[1]), float(items[2]) - -def get_movie_tuple(entry): - items = entry.split('::') - return int(items[0]), items[1] - -def sortFunction(tuple): - key = str('%06.3f' % tuple[0]) - value = tuple[1] - return (key + ' ' + value) - -def init_rdds(sc): - ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt" - moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat" - numPartitions = 2 - rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions) - rawMovies = sc.textFile(moviesFilename) - ratingsRDD = rawRatings.map(get_ratings_tuple).cache() - moviesRDD = rawMovies.map(get_movie_tuple).cache() - return ratingsRDD, moviesRDD +import data_prepare +from test_helper import Test +def getCountsAndAverages(IDandRatingsTuple): + movie = IDandRatingsTuple[0] + ratings = IDandRatingsTuple[1] + return (movie, (len(ratings), float(sum(ratings)) / len(ratings))) if __name__ == "__main__": import sys, os os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" - conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") sc = SparkContext.getOrCreate(conf) sc.setLogLevel("ERROR") + sc.addPyFile("data_prepare.py") - ratingsRDD, moviesRDD = init_rdds(sc) + ratingsRDD, moviesRDD = data_prepare.init_rdds(sc) + movieIDsWithRatingsRDD = (ratingsRDD + .map(lambda x: (x[1], x[2])) + .groupByKey()) + print('movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3)) - ratingsCount = ratingsRDD.count() - moviesCount = moviesRDD.count() + movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec)) + print('movieIDsWithAvgRatingsRDD1: %s\n' % movieIDsWithAvgRatingsRDD.take(3)) - print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)) - print('Ratings: %s' % ratingsRDD.take(3)) - print('Movies: %s' % moviesRDD.take(3)) + movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD) + .map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0]))) + print('movieNameWithAvgRatingsRDD2: %s\n' % movieNameWithAvgRatingsRDD.take(3)) - tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')] - tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')] - - oneRDD = sc.parallelize(tmp1) - twoRDD = sc.parallelize(tmp2) - oneSorted = oneRDD.sortByKey(True).collect() - twoSorted = twoRDD.sortByKey(True).collect() - print(oneSorted) - print(twoSorted) - assert set(oneSorted) == set(twoSorted) - assert twoSorted[0][0] < twoSorted.pop()[0] - assert oneSorted[0:2] != twoSorted[0:2] - - print(oneRDD.sortBy(sortFunction, True).collect()) - print(twoRDD.sortBy(sortFunction, True).collect()) - - oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction) - twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction) - print('one is %s' % oneSorted1) - print('two is %s' % twoSorted1) - assert oneSorted1 == twoSorted1 + movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD + .filter(lambda movie: movie[2] > 500) + .sortBy(data_prepare.sortFunction, False)) + print('Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20)) sc.stop() \ No newline at end of file