BD-exp-9/3-1.py

67 lines
2.1 KiB
Python
Raw Normal View History

# coding=utf-8
2025-04-16 01:40:52 +00:00
from pyspark import SparkContext
import csv
import re
2025-04-16 01:40:52 +00:00
# Python 3.5 没有 f-string使用 format
def tokenize(text):
# 分词并保留英文、数字
return re.findall(r'\w+', text.lower())
2025-04-16 01:40:52 +00:00
def load_stopwords(sc):
try:
return set(sc.textFile("hdfs://master:9000/user/root/stopwords.txt").collect())
except:
# fallback to local
with open("stopwords.txt", "r") as f:
return set([line.strip() for line in f])
2025-04-16 01:40:52 +00:00
def parse_csv_line(line):
# 使用 csv.reader 兼容逗号分隔含引号的数据
reader = csv.reader([line])
return next(reader)
2025-04-16 01:40:52 +00:00
def extract_info(line, source):
try:
fields = parse_csv_line(line)
if source == "google":
# Google: id, name, description, manufacturer...
pid = fields[0].strip()
text = "{} {} {}".format(fields[1], fields[2], fields[3])
else:
# Amazon: id, title, description, manufacturer...
pid = fields[0].strip()
text = "{} {} {}".format(fields[1], fields[2], fields[3])
return (pid, text)
except:
return (None, None)
2025-04-16 01:40:52 +00:00
if __name__ == "__main__":
sc = SparkContext(appName="InvertedIndex")
2025-04-16 01:40:52 +00:00
stopwords = load_stopwords(sc)
2025-04-16 01:40:52 +00:00
# 加载数据
google = sc.textFile("hdfs://master:9000/user/root/Google.csv")
amazon = sc.textFile("hdfs://master:9000/user/root/Amazon.csv")
2025-04-16 01:40:52 +00:00
# 提取内容
google_rdd = google.map(lambda line: extract_info(line, "google")) \
.filter(lambda x: x[0] is not None)
2025-04-16 01:40:52 +00:00
amazon_rdd = amazon.map(lambda line: extract_info(line, "amazon")) \
.filter(lambda x: x[0] is not None)
2025-04-16 01:40:52 +00:00
# 合并两数据集
all_data = google_rdd.union(amazon_rdd)
# 构建倒排索引
inverted_index = all_data.flatMap(lambda x: [((word, x[0])) for word in tokenize(x[1]) if word not in stopwords]) \
.groupByKey() \
.mapValues(lambda ids: list(set(ids)))
# 输出(可保存到 HDFS
inverted_index.saveAsTextFile("hdfs://master:9000/user/root/output/inverted_index")
sc.stop()