als_movie/data_prepare.py
fly6516 90d4056157 style(basic_re.py): 优化代码格式
- 删除多余的空行
- 将长行代码进行适当折行,提高可读性
2025-04-22 14:18:02 +08:00

72 lines
2.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pyspark import SparkContext, SparkConf
import os
os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171"
def get_ratings_tuple(entry):
items = entry.split('::')
return int(items[0]), int(items[1]), float(items[2])
def get_movie_tuple(entry):
items = entry.split('::')
return int(items[0]), items[1]
def sortFunction(tuple):
key = str('%06.3f ' % tuple[0])
value = tuple[1]
return (key + ' ' + value)
def init_rdds(sc):
ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt"
moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat"
numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename)
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()
return ratingsRDD, moviesRDD
if __name__ == "__main__":
import sys, os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
#连接到Spark独立集群的主节点Master地址为master:7077;任务名als_movie
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
ratingsRDD, moviesRDD = init_rdds(sc)
ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()
print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount))
print('Ratings: %s' % ratingsRDD.take(3))
print('Movies: %s' % moviesRDD.take(3))
tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')]
tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')]
oneRDD = sc.parallelize(tmp1)
twoRDD = sc.parallelize(tmp2)
oneSorted = oneRDD.sortByKey(True).collect()
twoSorted = twoRDD.sortByKey(True).collect()
print(oneSorted)
print(twoSorted)
assert set(oneSorted) == set(twoSorted)
assert twoSorted[0][0] < twoSorted.pop()[0]
assert oneSorted[0:2] != twoSorted[0:2]
print(oneRDD.sortBy(sortFunction, True).collect())
print(twoRDD.sortBy(sortFunction, True).collect())
oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction)
twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction)
print('one is %s' % oneSorted1)
print('two is %s' % twoSorted1)
assert oneSorted1 == twoSorted1
sc.stop()