- 将 HDFS 文件路径从相对路径改为绝对路径 - 新路径为 "hdfs://master:9000/user/root/stopwords.txt" - 此修改提高了文件访问的准确性和可靠性
67 lines
2.1 KiB
Python
67 lines
2.1 KiB
Python
# coding=utf-8
|
||
from pyspark import SparkContext
|
||
import csv
|
||
import re
|
||
|
||
# Python 3.5 没有 f-string,使用 format
|
||
def tokenize(text):
|
||
# 分词并保留英文、数字
|
||
return re.findall(r'\w+', text.lower())
|
||
|
||
def load_stopwords(sc):
|
||
try:
|
||
return set(sc.textFile("hdfs://master:9000/user/root/stopwords.txt").collect())
|
||
except:
|
||
# fallback to local
|
||
with open("stopwords.txt", "r") as f:
|
||
return set([line.strip() for line in f])
|
||
|
||
def parse_csv_line(line):
|
||
# 使用 csv.reader 兼容逗号分隔含引号的数据
|
||
reader = csv.reader([line])
|
||
return next(reader)
|
||
|
||
def extract_info(line, source):
|
||
try:
|
||
fields = parse_csv_line(line)
|
||
if source == "google":
|
||
# Google: id, name, description, manufacturer...
|
||
pid = fields[0].strip()
|
||
text = "{} {} {}".format(fields[1], fields[2], fields[3])
|
||
else:
|
||
# Amazon: id, title, description, manufacturer...
|
||
pid = fields[0].strip()
|
||
text = "{} {} {}".format(fields[1], fields[2], fields[3])
|
||
return (pid, text)
|
||
except:
|
||
return (None, None)
|
||
|
||
if __name__ == "__main__":
|
||
sc = SparkContext(appName="InvertedIndex")
|
||
|
||
stopwords = load_stopwords(sc)
|
||
|
||
# 加载数据
|
||
google = sc.textFile("hdfs://master:9000/user/root/GoogleProducts.csv")
|
||
amazon = sc.textFile("hdfs://master:9000/user/root/AmazonProducts.csv")
|
||
|
||
# 提取内容
|
||
google_rdd = google.map(lambda line: extract_info(line, "google")) \
|
||
.filter(lambda x: x[0] is not None)
|
||
|
||
amazon_rdd = amazon.map(lambda line: extract_info(line, "amazon")) \
|
||
.filter(lambda x: x[0] is not None)
|
||
|
||
# 合并两数据集
|
||
all_data = google_rdd.union(amazon_rdd)
|
||
|
||
# 构建倒排索引
|
||
inverted_index = all_data.flatMap(lambda x: [((word, x[0])) for word in tokenize(x[1]) if word not in stopwords]) \
|
||
.groupByKey() \
|
||
.mapValues(lambda ids: list(set(ids)))
|
||
|
||
# 输出(可保存到 HDFS)
|
||
inverted_index.saveAsTextFile("hdfs:///user/root/output/inverted_index")
|
||
|
||
sc.stop()
|