diff --git a/data_prepare-0.py b/data_prepare-0.py new file mode 100644 index 0000000..5fb8fe9 --- /dev/null +++ b/data_prepare-0.py @@ -0,0 +1,73 @@ +from pyspark import SparkContext, SparkConf +import os +os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171" +os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" +os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3" + +def get_ratings_tuple(entry): + items = entry.split('::') + return int(items[0]), int(items[1]), float(items[2]) + +def get_movie_tuple(entry): + items = entry.split('::') + return int(items[0]), items[1] + +def sortFunction(tuple): + key = str('%06.3f ' % tuple[0]) + value = tuple[1] + return (key + ' ' + value) + +def init_rdds(sc): + ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt" + moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat" + + numPartitions = 2 + rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions) + rawMovies = sc.textFile(moviesFilename) + + ratingsRDD = rawRatings.map(get_ratings_tuple).cache() + moviesRDD = rawMovies.map(get_movie_tuple).cache() + + return ratingsRDD, moviesRDD + +if __name__ == "__main__": + import sys, os + os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" + os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" + + conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") + #连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie +sc = SparkContext.getOrCreate(conf) + sc.setLogLevel("ERROR") + + ratingsRDD, moviesRDD = init_rdds(sc) + + ratingsCount = ratingsRDD.count() + moviesCount = moviesRDD.count() + + print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)) + print('Ratings: %s' % ratingsRDD.take(3)) + print('Movies: %s' % moviesRDD.take(3)) + + tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')] + tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')] + + oneRDD = sc.parallelize(tmp1) + twoRDD = sc.parallelize(tmp2) + oneSorted = oneRDD.sortByKey(True).collect() + twoSorted = twoRDD.sortByKey(True).collect() + print(oneSorted) + print(twoSorted) + assert set(oneSorted) == set(twoSorted) + assert twoSorted[0][0] < twoSorted.pop()[0] + assert oneSorted[0:2] != twoSorted[0:2] + + print(oneRDD.sortBy(sortFunction, True).collect()) + print(twoRDD.sortBy(sortFunction, True).collect()) + + oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction) + twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction) + print('one is %s' % oneSorted1) + print('two is %s' % twoSorted1) + assert oneSorted1 == twoSorted1 + sc.stop()