# coding=utf-8 from pyspark import SparkContext import csv import re # Python 3.5 没有 f-string,使用 format def tokenize(text): # 分词并保留英文、数字 return re.findall(r'\w+', text.lower()) def load_stopwords(sc): try: return set(sc.textFile("hdfs://master:9000/user/root/stopwords.txt").collect()) except: # fallback to local with open("stopwords.txt", "r") as f: return set([line.strip() for line in f]) def parse_csv_line(line): # 使用 csv.reader 兼容逗号分隔含引号的数据 reader = csv.reader([line]) return next(reader) def extract_info(line, source): try: fields = parse_csv_line(line) if source == "google": # Google: id, name, description, manufacturer... pid = fields[0].strip() text = "{} {} {}".format(fields[1], fields[2], fields[3]) else: # Amazon: id, title, description, manufacturer... pid = fields[0].strip() text = "{} {} {}".format(fields[1], fields[2], fields[3]) return (pid, text) except: return (None, None) if __name__ == "__main__": sc = SparkContext(appName="InvertedIndex") stopwords = load_stopwords(sc) # 加载数据 google = sc.textFile("hdfs://master:9000/user/root/Google.csv") amazon = sc.textFile("hdfs://master:9000/user/root/Amazon.csv") # 提取内容 google_rdd = google.map(lambda line: extract_info(line, "google")) \ .filter(lambda x: x[0] is not None) amazon_rdd = amazon.map(lambda line: extract_info(line, "amazon")) \ .filter(lambda x: x[0] is not None) # 合并两数据集 all_data = google_rdd.union(amazon_rdd) # 构建倒排索引 inverted_index = all_data.flatMap(lambda x: [((word, x[0])) for word in tokenize(x[1]) if word not in stopwords]) \ .groupByKey() \ .mapValues(lambda ids: list(set(ids))) # 输出(可保存到 HDFS) inverted_index.saveAsTextFile("hdfs:///user/root/output/inverted_index") sc.stop()