als_movie/basic_re.py
fly6516 90d4056157 style(basic_re.py): 优化代码格式
- 删除多余的空行
- 将长行代码进行适当折行,提高可读性
2025-04-22 14:18:02 +08:00

35 lines
1.6 KiB
Python

from pyspark import SparkContext, SparkConf
import data_prepare
from test_helper import Test
def getCountsAndAverages(IDandRatingsTuple):
movie = IDandRatingsTuple[0]
ratings = IDandRatingsTuple[1]
return (movie, (len(ratings), float(sum(ratings)) / len(ratings)))
if __name__ == "__main__":
import sys, os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
sc.addPyFile("data_prepare.py")
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
movieIDsWithRatingsRDD = (ratingsRDD
.map(lambda x: (x[1], x[2]))
.groupByKey())
print('movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3))
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec))
print('movieIDsWithAvgRatingsRDD1: %s\n' % movieIDsWithAvgRatingsRDD.take(3))
movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD)
.map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0])))
print('movieNameWithAvgRatingsRDD2: %s\n' % movieNameWithAvgRatingsRDD.take(3))
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
.filter(lambda movie: movie[2] > 500)
.sortBy(data_prepare.sortFunction, False))
print('Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20))
sc.stop()