from pyspark import SparkContext, SparkConf import os # Set Java environment variable os.environ['JAVA_HOME'] = '/opt/module/jdk1.8.0_171' # Parse ratings data into (userID, movieID, rating) def get_ratings_tuple(entry): user, movie, rating, _ = entry.split('::') return int(user), int(movie), float(rating) # Parse movies data into (movieID, title) def get_movie_tuple(entry): mid, title, _ = entry.split('::') return int(mid), title # Generate deterministic key for sorting def sort_key(rec): score, name = rec return f"{score:06.3f} {name}" # Initialize and return ratingsRDD, moviesRDD def init_rdds(sc, hdfs_base='hdfs://master:9000/user/root/als_movie'): ratings_path = f"{hdfs_base}/ratings.txt" movies_path = f"{hdfs_base}/movies.dat" raw_r = sc.textFile(ratings_path).repartition(2) raw_m = sc.textFile(movies_path) ratings_rdd = raw_r.map(get_ratings_tuple).cache() movies_rdd = raw_m.map(get_movie_tuple).cache() return ratings_rdd, movies_rdd if __name__ == '__main__': conf = SparkConf().setMaster('spark://master:7077').setAppName('als_movie') sc = SparkContext.getOrCreate(conf) sc.setLogLevel('ERROR') rdd_ratings, rdd_movies = init_rdds(sc) print(f"Ratings count: {rdd_ratings.count()}") print(f"Movies count: {rdd_movies.count()}") sc.stop()