style(basic_re.py): 优化代码格式
- 删除多余的空行 - 将长行代码进行适当折行,提高可读性
This commit is contained in:
parent
d9d9e2603e
commit
90d4056157
@ -24,7 +24,8 @@ if __name__ == "__main__":
|
|||||||
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec))
|
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec))
|
||||||
print('movieIDsWithAvgRatingsRDD1: %s\n' % movieIDsWithAvgRatingsRDD.take(3))
|
print('movieIDsWithAvgRatingsRDD1: %s\n' % movieIDsWithAvgRatingsRDD.take(3))
|
||||||
|
|
||||||
movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD).map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0])))
|
movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD)
|
||||||
|
.map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0])))
|
||||||
print('movieNameWithAvgRatingsRDD2: %s\n' % movieNameWithAvgRatingsRDD.take(3))
|
print('movieNameWithAvgRatingsRDD2: %s\n' % movieNameWithAvgRatingsRDD.take(3))
|
||||||
|
|
||||||
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
|
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
|
||||||
|
@ -1,73 +0,0 @@
|
|||||||
from pyspark import SparkContext, SparkConf
|
|
||||||
import os
|
|
||||||
os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171"
|
|
||||||
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
|
|
||||||
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
|
|
||||||
|
|
||||||
def get_ratings_tuple(entry):
|
|
||||||
items = entry.split('::')
|
|
||||||
return int(items[0]), int(items[1]), float(items[2])
|
|
||||||
|
|
||||||
def get_movie_tuple(entry):
|
|
||||||
items = entry.split('::')
|
|
||||||
return int(items[0]), items[1]
|
|
||||||
|
|
||||||
def sortFunction(tuple):
|
|
||||||
key = str('%06.3f ' % tuple[0])
|
|
||||||
value = tuple[1]
|
|
||||||
return (key + ' ' + value)
|
|
||||||
|
|
||||||
def init_rdds(sc):
|
|
||||||
ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt"
|
|
||||||
moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat"
|
|
||||||
|
|
||||||
numPartitions = 2
|
|
||||||
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
|
|
||||||
rawMovies = sc.textFile(moviesFilename)
|
|
||||||
|
|
||||||
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
|
|
||||||
moviesRDD = rawMovies.map(get_movie_tuple).cache()
|
|
||||||
|
|
||||||
return ratingsRDD, moviesRDD
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
import sys, os
|
|
||||||
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
|
|
||||||
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
|
|
||||||
|
|
||||||
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
|
|
||||||
#连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie
|
|
||||||
sc = SparkContext.getOrCreate(conf)
|
|
||||||
sc.setLogLevel("ERROR")
|
|
||||||
|
|
||||||
ratingsRDD, moviesRDD = init_rdds(sc)
|
|
||||||
|
|
||||||
ratingsCount = ratingsRDD.count()
|
|
||||||
moviesCount = moviesRDD.count()
|
|
||||||
|
|
||||||
print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount))
|
|
||||||
print('Ratings: %s' % ratingsRDD.take(3))
|
|
||||||
print('Movies: %s' % moviesRDD.take(3))
|
|
||||||
|
|
||||||
tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')]
|
|
||||||
tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')]
|
|
||||||
|
|
||||||
oneRDD = sc.parallelize(tmp1)
|
|
||||||
twoRDD = sc.parallelize(tmp2)
|
|
||||||
oneSorted = oneRDD.sortByKey(True).collect()
|
|
||||||
twoSorted = twoRDD.sortByKey(True).collect()
|
|
||||||
print(oneSorted)
|
|
||||||
print(twoSorted)
|
|
||||||
assert set(oneSorted) == set(twoSorted)
|
|
||||||
assert twoSorted[0][0] < twoSorted.pop()[0]
|
|
||||||
assert oneSorted[0:2] != twoSorted[0:2]
|
|
||||||
|
|
||||||
print(oneRDD.sortBy(sortFunction, True).collect())
|
|
||||||
print(twoRDD.sortBy(sortFunction, True).collect())
|
|
||||||
|
|
||||||
oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction)
|
|
||||||
twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction)
|
|
||||||
print('one is %s' % oneSorted1)
|
|
||||||
print('two is %s' % twoSorted1)
|
|
||||||
assert oneSorted1 == twoSorted1
|
|
||||||
sc.stop()
|
|
@ -1,42 +1,71 @@
|
|||||||
from pyspark import SparkContext, SparkConf
|
from pyspark import SparkContext, SparkConf
|
||||||
import os
|
import os
|
||||||
|
os.environ['JAVA_HOME'] = "/opt/module/jdk1.8.0_171"
|
||||||
|
|
||||||
# Set Java environment variable
|
|
||||||
os.environ['JAVA_HOME'] = '/opt/module/jdk1.8.0_171'
|
|
||||||
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
|
|
||||||
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
|
|
||||||
|
|
||||||
# Parse ratings data into (userID, movieID, rating)
|
|
||||||
def get_ratings_tuple(entry):
|
def get_ratings_tuple(entry):
|
||||||
user, movie, rating, _ = entry.split('::')
|
items = entry.split('::')
|
||||||
return int(user), int(movie), float(rating)
|
return int(items[0]), int(items[1]), float(items[2])
|
||||||
|
|
||||||
# Parse movies data into (movieID, title)
|
|
||||||
def get_movie_tuple(entry):
|
def get_movie_tuple(entry):
|
||||||
mid, title, _ = entry.split('::')
|
items = entry.split('::')
|
||||||
return int(mid), title
|
return int(items[0]), items[1]
|
||||||
|
|
||||||
# Generate deterministic key for sorting
|
def sortFunction(tuple):
|
||||||
def sort_key(rec):
|
key = str('%06.3f ' % tuple[0])
|
||||||
score, name = rec
|
value = tuple[1]
|
||||||
return f"{score:06.3f} {name}"
|
return (key + ' ' + value)
|
||||||
|
|
||||||
# Initialize and return ratingsRDD, moviesRDD
|
def init_rdds(sc):
|
||||||
def init_rdds(sc, hdfs_base='hdfs://master:9000/user/root/als_movie'):
|
ratingsFilename = "hdfs://master:9000/user/root/als_movie/ratings.txt"
|
||||||
ratings_path = f"{hdfs_base}/ratings.txt"
|
moviesFilename = "hdfs://master:9000/user/root/als_movie/movies.dat"
|
||||||
movies_path = f"{hdfs_base}/movies.dat"
|
|
||||||
raw_r = sc.textFile(ratings_path).repartition(2)
|
|
||||||
raw_m = sc.textFile(movies_path)
|
|
||||||
ratings_rdd = raw_r.map(get_ratings_tuple).cache()
|
|
||||||
movies_rdd = raw_m.map(get_movie_tuple).cache()
|
|
||||||
return ratings_rdd, movies_rdd
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
numPartitions = 2
|
||||||
conf = SparkConf().setMaster('spark://master:7077').setAppName('als_movie')
|
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
|
||||||
|
rawMovies = sc.textFile(moviesFilename)
|
||||||
|
|
||||||
|
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
|
||||||
|
moviesRDD = rawMovies.map(get_movie_tuple).cache()
|
||||||
|
|
||||||
|
return ratingsRDD, moviesRDD
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import sys, os
|
||||||
|
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
|
||||||
|
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
|
||||||
|
|
||||||
|
conf = SparkConf().setMaster("spark://master:7077").setAppName("als_movie")
|
||||||
|
#连接到Spark独立集群的主节点(Master),地址为master:7077;任务名als_movie
|
||||||
sc = SparkContext.getOrCreate(conf)
|
sc = SparkContext.getOrCreate(conf)
|
||||||
sc.setLogLevel('ERROR')
|
sc.setLogLevel("ERROR")
|
||||||
|
|
||||||
rdd_ratings, rdd_movies = init_rdds(sc)
|
ratingsRDD, moviesRDD = init_rdds(sc)
|
||||||
print(f"Ratings count: {rdd_ratings.count()}")
|
|
||||||
print(f"Movies count: {rdd_movies.count()}")
|
ratingsCount = ratingsRDD.count()
|
||||||
sc.stop()
|
moviesCount = moviesRDD.count()
|
||||||
|
|
||||||
|
print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount))
|
||||||
|
print('Ratings: %s' % ratingsRDD.take(3))
|
||||||
|
print('Movies: %s' % moviesRDD.take(3))
|
||||||
|
|
||||||
|
tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')]
|
||||||
|
tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')]
|
||||||
|
|
||||||
|
oneRDD = sc.parallelize(tmp1)
|
||||||
|
twoRDD = sc.parallelize(tmp2)
|
||||||
|
oneSorted = oneRDD.sortByKey(True).collect()
|
||||||
|
twoSorted = twoRDD.sortByKey(True).collect()
|
||||||
|
print(oneSorted)
|
||||||
|
print(twoSorted)
|
||||||
|
assert set(oneSorted) == set(twoSorted)
|
||||||
|
assert twoSorted[0][0] < twoSorted.pop()[0]
|
||||||
|
assert oneSorted[0:2] != twoSorted[0:2]
|
||||||
|
|
||||||
|
print(oneRDD.sortBy(sortFunction, True).collect())
|
||||||
|
print(twoRDD.sortBy(sortFunction, True).collect())
|
||||||
|
|
||||||
|
oneSorted1 = oneRDD.takeOrdered(oneRDD.count(), key=sortFunction)
|
||||||
|
twoSorted1 = twoRDD.takeOrdered(twoRDD.count(), key=sortFunction)
|
||||||
|
print('one is %s' % oneSorted1)
|
||||||
|
print('two is %s' % twoSorted1)
|
||||||
|
assert oneSorted1 == twoSorted1
|
||||||
|
sc.stop()
|
||||||
|
Loading…
Reference in New Issue
Block a user