diff --git a/basic_re.py b/basic_re.py index 5ab0bde..fccca8e 100644 --- a/basic_re.py +++ b/basic_re.py @@ -24,7 +24,8 @@ if __name__ == "__main__": movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec)) print('movieIDsWithAvgRatingsRDD1: %s\n' % movieIDsWithAvgRatingsRDD.take(3)) - movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD).map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0]))) + movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD) + .map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0]))) print('movieNameWithAvgRatingsRDD2: %s\n' % movieNameWithAvgRatingsRDD.take(3)) movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD diff --git a/data_prepare-0.py b/data_prepare-0.py deleted file mode 100644 index 0b99758..0000000 --- a/data_prepare-0.py +++ /dev/null @@ -1,73 +0,0 @@ -from pyspark import SparkContext, SparkConf -import os -os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171" -os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" -os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3" - -def get_ratings_tuple(entry): - items = entry.split('::') - return int(items[0]), int(items[1]), float(items[2]) - -def get_movie_tuple(entry): - items = entry.split('::') - return int(items[0]), items[1] - -def sortFunction(tuple): - key = str('%06.3f ' % tuple[0]) - value = tuple[1] - return (key + ' ' + value) - -def init_rdds(sc): - ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt" - moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat" - - numPartitions = 2 - rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions) - rawMovies = sc.textFile(moviesFilename) - - ratingsRDD = rawRatings.map(get_ratings_tuple).cache() - moviesRDD = rawMovies.map(get_movie_tuple).cache() - - return ratingsRDD, moviesRDD - -if __name__ == "__main__": - import sys, os - os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" - os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" - - conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") - #连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie - sc = SparkContext.getOrCreate(conf) - sc.setLogLevel("ERROR") - - ratingsRDD, moviesRDD = init_rdds(sc) - - ratingsCount = ratingsRDD.count() - moviesCount = moviesRDD.count() - - print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)) - print('Ratings: %s' % ratingsRDD.take(3)) - print('Movies: %s' % moviesRDD.take(3)) - - tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')] - tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')] - - oneRDD = sc.parallelize(tmp1) - twoRDD = sc.parallelize(tmp2) - oneSorted = oneRDD.sortByKey(True).collect() - twoSorted = twoRDD.sortByKey(True).collect() - print(oneSorted) - print(twoSorted) - assert set(oneSorted) == set(twoSorted) - assert twoSorted[0][0] < twoSorted.pop()[0] - assert oneSorted[0:2] != twoSorted[0:2] - - print(oneRDD.sortBy(sortFunction, True).collect()) - print(twoRDD.sortBy(sortFunction, True).collect()) - - oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction) - twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction) - print('one is %s' % oneSorted1) - print('two is %s' % twoSorted1) - assert oneSorted1 == twoSorted1 - sc.stop() diff --git a/data_prepare.py b/data_prepare.py index 1e8cb75..ed6a300 100644 --- a/data_prepare.py +++ b/data_prepare.py @@ -1,42 +1,71 @@ from pyspark import SparkContext, SparkConf import os +os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171" -# Set Java environment variable -os.environ['JAVA_HOME'] = '/opt/module/jdk1.8.0_171' -os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" -os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3" - -# Parse ratings data into (userID, movieID, rating) def get_ratings_tuple(entry): - user, movie, rating, _ = entry.split('::') - return int(user), int(movie), float(rating) + items = entry.split('::') + return int(items[0]), int(items[1]), float(items[2]) -# Parse movies data into (movieID, title) def get_movie_tuple(entry): - mid, title, _ = entry.split('::') - return int(mid), title + items = entry.split('::') + return int(items[0]), items[1] -# Generate deterministic key for sorting -def sort_key(rec): - score, name = rec - return f"{score:06.3f} {name}" +def sortFunction(tuple): + key = str('%06.3f ' % tuple[0]) + value = tuple[1] + return (key + ' ' + value) -# Initialize and return ratingsRDD, moviesRDD -def init_rdds(sc, hdfs_base='hdfs://master:9000/user/root/als_movie'): - ratings_path = f"{hdfs_base}/ratings.txt" - movies_path = f"{hdfs_base}/movies.dat" - raw_r = sc.textFile(ratings_path).repartition(2) - raw_m = sc.textFile(movies_path) - ratings_rdd = raw_r.map(get_ratings_tuple).cache() - movies_rdd = raw_m.map(get_movie_tuple).cache() - return ratings_rdd, movies_rdd +def init_rdds(sc): + ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt" + moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat" -if __name__ == '__main__': - conf = SparkConf().setMaster('spark://master:7077').setAppName('als_movie') + numPartitions = 2 + rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions) + rawMovies = sc.textFile(moviesFilename) + + ratingsRDD = rawRatings.map(get_ratings_tuple).cache() + moviesRDD = rawMovies.map(get_movie_tuple).cache() + + return ratingsRDD, moviesRDD + +if __name__ == "__main__": + import sys, os + os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" + os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3" + + conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie") + #连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie sc = SparkContext.getOrCreate(conf) - sc.setLogLevel('ERROR') + sc.setLogLevel("ERROR") - rdd_ratings, rdd_movies = init_rdds(sc) - print(f"Ratings count: {rdd_ratings.count()}") - print(f"Movies count: {rdd_movies.count()}") - sc.stop() \ No newline at end of file + ratingsRDD, moviesRDD = init_rdds(sc) + + ratingsCount = ratingsRDD.count() + moviesCount = moviesRDD.count() + + print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)) + print('Ratings: %s' % ratingsRDD.take(3)) + print('Movies: %s' % moviesRDD.take(3)) + + tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')] + tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')] + + oneRDD = sc.parallelize(tmp1) + twoRDD = sc.parallelize(tmp2) + oneSorted = oneRDD.sortByKey(True).collect() + twoSorted = twoRDD.sortByKey(True).collect() + print(oneSorted) + print(twoSorted) + assert set(oneSorted) == set(twoSorted) + assert twoSorted[0][0] < twoSorted.pop()[0] + assert oneSorted[0:2] != twoSorted[0:2] + + print(oneRDD.sortBy(sortFunction, True).collect()) + print(twoRDD.sortBy(sortFunction, True).collect()) + + oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction) + twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction) + print('one is %s' % oneSorted1) + print('two is %s' % twoSorted1) + assert oneSorted1 == twoSorted1 + sc.stop()