# 3-1.py from pyspark import SparkContext from collections import defaultdict sc = SparkContext() corpus = google_tokens.union(amazon_tokens) N = corpus.count() def compute_tf(record): doc_id, tokens = record tf = defaultdict(float) for token in tokens: tf[token] += 1.0 total = float(len(tokens)) for key in tf: tf[key] = tf[key] / total return (doc_id, tf) tf_rdd = corpus.map(compute_tf) token_docs = corpus.flatMap(lambda x: [(token, x[0]) for token in set(x[1])]) doc_freq = token_docs.groupByKey().mapValues(lambda x: len(set(x))) idf_dict = doc_freq.map(lambda x: (x[0], float(N) / x[1])).collectAsMap() idf_bcast = sc.broadcast(idf_dict) def compute_tfidf(record): doc_id, tf_map = record idf_map = idf_bcast.value tfidf = {} for token in tf_map: tfidf[token] = tf_map[token] * idf_map.get(token, 0.0) return (doc_id, tfidf) tfidf_rdd = tf_rdd.map(compute_tfidf) print("TF-IDF sample: ", tfidf_rdd.take(1))