from pyspark import SparkContext, SparkConf import os os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171" def get_ratings_tuple(entry): items = entry.split('::') return int(items[0]), int(items[1]), float(items[2]) def get_movie_tuple(entry): items = entry.split('::') return int(items[0]), items[1] def sortFunction(tuple): key = str('%06.3f ' % tuple[0]) value = tuple[1] return (key + ' ' + value) def init_rdds(sc): ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt" moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat" numPartitions = 2 rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions) rawMovies = sc.textFile(moviesFilename) ratingsRDD = rawRatings.map(get_ratings_tuple).cache() moviesRDD = rawMovies.map(get_movie_tuple).cache() return ratingsRDD, moviesRDD if __name__ == "__main__": import sys, os os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") #连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie sc = SparkContext.getOrCreate(conf) sc.setLogLevel("ERROR") ratingsRDD, moviesRDD = init_rdds(sc) ratingsCount = ratingsRDD.count() moviesCount = moviesRDD.count() print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)) print('Ratings: %s' % ratingsRDD.take(3)) print('Movies: %s' % moviesRDD.take(3)) tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')] tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')] oneRDD = sc.parallelize(tmp1) twoRDD = sc.parallelize(tmp2) oneSorted = oneRDD.sortByKey(True).collect() twoSorted = twoRDD.sortByKey(True).collect() print(oneSorted) print(twoSorted) assert set(oneSorted) == set(twoSorted) assert twoSorted[0][0] < twoSorted.pop()[0] assert oneSorted[0:2] != twoSorted[0:2] print(oneRDD.sortBy(sortFunction, True).collect()) print(twoRDD.sortBy(sortFunction, True).collect()) oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction) twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction) print('one is %s' % oneSorted1) print('two is %s' % twoSorted1) assert oneSorted1 == twoSorted1 sc.stop()