commit 8e93a043823c19d9126876a8d6bc5ced7bdf83e9 Author: fly6516 Date: Wed Apr 16 01:40:52 2025 +0000 init code diff --git a/1-1.py b/1-1.py new file mode 100644 index 0000000..d5fc7a8 --- /dev/null +++ b/1-1.py @@ -0,0 +1,29 @@ +# 1-1.py +from pyspark import SparkContext + +sc = SparkContext() + +def parse_line(line): + try: + parts = line.strip().split('","') + id_ = parts[0].replace('"', '') + title = parts[1].strip() + desc = parts[2].strip() + manufacturer = parts[3].strip() + content = "{} {} {}".format(title, desc, manufacturer).strip() + return (id_, content) + except Exception as e: + return None + +google_path = "hdfs://master:9000/user/root/Google_small.csv" +amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" + +google_raw = sc.textFile(google_path) +amazon_raw = sc.textFile(amazon_path) + +google_parsed = google_raw.map(parse_line).filter(lambda x: x is not None and x[0] != 'id') +amazon_parsed = amazon_raw.map(parse_line).filter(lambda x: x is not None and x[0] != 'id') + + +print("Google ", google_parsed.take(1)) +print("Amazon ", amazon_parsed.take(1)) diff --git a/2-1.py b/2-1.py new file mode 100644 index 0000000..23ea28d --- /dev/null +++ b/2-1.py @@ -0,0 +1,42 @@ +# 2-1.py +from pyspark import SparkContext + +sc = SparkContext() + +stopwords = set(sc.textFile("hdfs://master:9000/user/root/stopwords.txt").collect()) + +def tokenize(text): + import re + words = re.findall(r'\w+', text.lower()) + return [word for word in words if word not in stopwords] + +def to_token_rdd(record): + return (record[0], tokenize(record[1])) + +google_path = "hdfs://master:9000/user/root/Google_small.csv" +amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" + +def parse_line(line): + try: + parts = line.strip().split('","') + id_ = parts[0].replace('"', '') + title = parts[1].strip() + desc = parts[2].strip() + manufacturer = parts[3].strip() + content = "{} {} {}".format(title, desc, manufacturer).strip() + return (id_, content) + except: + return None + +google = sc.textFile(google_path).map(parse_line).filter(lambda x: x is not None and x[0] != 'id') +amazon = sc.textFile(amazon_path).map(parse_line).filter(lambda x: x is not None and x[0] != 'id') + +google_tokens = google.map(to_token_rdd) +amazon_tokens = amazon.map(to_token_rdd) + + +token_count = google_tokens.union(amazon_tokens).flatMap(lambda x: x[1]).count() +longest_amazon_doc = amazon_tokens.map(lambda x: (x[0], len(x[1]))).takeOrdered(1, key=lambda x: -x[1]) + +print("total token: ", token_count) +print("longest Amazon record:", longest_amazon_doc) diff --git a/3-1.py b/3-1.py new file mode 100644 index 0000000..17856a7 --- /dev/null +++ b/3-1.py @@ -0,0 +1,39 @@ +# 3-1.py +from pyspark import SparkContext +from collections import defaultdict + +sc = SparkContext() + +corpus = google_tokens.union(amazon_tokens) +N = corpus.count() + +def compute_tf(record): + doc_id, tokens = record + tf = defaultdict(float) + for token in tokens: + tf[token] += 1.0 + total = float(len(tokens)) + for key in tf: + tf[key] = tf[key] / total + return (doc_id, tf) + +tf_rdd = corpus.map(compute_tf) + +token_docs = corpus.flatMap(lambda x: [(token, x[0]) for token in set(x[1])]) +doc_freq = token_docs.groupByKey().mapValues(lambda x: len(set(x))) +idf_dict = doc_freq.map(lambda x: (x[0], float(N) / x[1])).collectAsMap() +idf_bcast = sc.broadcast(idf_dict) + + +def compute_tfidf(record): + doc_id, tf_map = record + idf_map = idf_bcast.value + tfidf = {} + for token in tf_map: + tfidf[token] = tf_map[token] * idf_map.get(token, 0.0) + return (doc_id, tfidf) + +tfidf_rdd = tf_rdd.map(compute_tfidf) + + +print("TF-IDF sample: ", tfidf_rdd.take(1))