refactor(basic_re): 重构电影评分数据处理逻辑

-移除了不必要的环境变量设置和测试代码
- 新增 data_prepare模块用于初始化 RDD
- 添加了计算电影平均评分和过滤高评分电影的功能
- 优化了代码结构,提高了可读性和可维护性
This commit is contained in:
fly6516 2025-04-22 14:02:26 +08:00
parent 80bdb59d66
commit cf26e01935

View File

@ -1,68 +1,35 @@
from pyspark import SparkContext, SparkConf
import os
os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171"
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
def get_ratings_tuple(entry):
items = entry.split('::')
return int(items[0]), int(items[1]), float(items[2])
def get_movie_tuple(entry):
items = entry.split('::')
return int(items[0]), items[1]
def sortFunction(tuple):
key = str('%06.3f' % tuple[0])
value = tuple[1]
return (key + ' ' + value)
def init_rdds(sc):
ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt"
moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat"
numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename)
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()
return ratingsRDD, moviesRDD
import data_prepare
from test_helper import Test
def getCountsAndAverages(IDandRatingsTuple):
movie = IDandRatingsTuple[0]
ratings = IDandRatingsTuple[1]
return (movie, (len(ratings), float(sum(ratings)) / len(ratings)))
if __name__ == "__main__":
import sys, os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
sc.addPyFile("data_prepare.py")
ratingsRDD, moviesRDD = init_rdds(sc)
ratingsRDD, moviesRDD = data_prepare.init_rdds(sc)
movieIDsWithRatingsRDD = (ratingsRDD
.map(lambda x: (x[1], x[2]))
.groupByKey())
print('movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3))
ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec))
print('movieIDsWithAvgRatingsRDD1: %s\n' % movieIDsWithAvgRatingsRDD.take(3))
print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount))
print('Ratings: %s' % ratingsRDD.take(3))
print('Movies: %s' % moviesRDD.take(3))
movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD)
.map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0])))
print('movieNameWithAvgRatingsRDD2: %s\n' % movieNameWithAvgRatingsRDD.take(3))
tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')]
tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')]
oneRDD = sc.parallelize(tmp1)
twoRDD = sc.parallelize(tmp2)
oneSorted = oneRDD.sortByKey(True).collect()
twoSorted = twoRDD.sortByKey(True).collect()
print(oneSorted)
print(twoSorted)
assert set(oneSorted) == set(twoSorted)
assert twoSorted[0][0] < twoSorted.pop()[0]
assert oneSorted[0:2] != twoSorted[0:2]
print(oneRDD.sortBy(sortFunction, True).collect())
print(twoRDD.sortBy(sortFunction, True).collect())
oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction)
twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction)
print('one is %s' % oneSorted1)
print('two is %s' % twoSorted1)
assert oneSorted1 == twoSorted1
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
.filter(lambda movie: movie[2] > 500)
.sortBy(data_prepare.sortFunction, False))
print('Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20))
sc.stop()