diff --git a/3-1.py b/3-1.py index c8f5062..ced77e3 100644 --- a/3-1.py +++ b/3-1.py @@ -1,66 +1,115 @@ -# coding=utf-8 +# -*- coding: utf-8 -*- +""" +实验步骤3:利用 TF-IDF 加权提升文本相似性计算准确性 + 1. 读取 Amazon_small.csv 和 Google_small.csv 数据, + 提取文档ID和文本(标题、描述、制造商)的组合; + 2. 对文本进行分词,构建语料库(格式:(doc_id, [token列表])); + 3. 计算 TF(词频):统计每个词在文档中的出现次数除以该文档总词数; + 4. 计算 IDF(逆文档频率):IDF(t)= N / n(t),其中 N 为文档总数,n(t) 为包含词 t 的文档数; + 5. 计算 TF-IDF:每个词的 TF-IDF = TF * IDF; + 6. 输出格式:((doc_id, term), tfidf_value) + +注意:确保 HDFS 上已上传以下文件: + - Amazon_small.csv + - Google_small.csv + - 若代码中需要停用词过滤,可以自行调整或扩展(本示例未特别去除停用词) +""" + from pyspark import SparkContext -import csv import re -# Python 3.5 没有 f-string,使用 format -def tokenize(text): - # 分词并保留英文、数字 - return re.findall(r'\w+', text.lower()) +sc = SparkContext(appName="TFIDF_Analysis") + +# 请根据实际情况修改 HDFS 主机及端口,如 "hdfs://localhost:9000" +amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" +google_path = "hdfs://master:9000/user/root/Google_small.csv" -def load_stopwords(sc): - try: - return set(sc.textFile("hdfs://master:9000/user/root/stopwords.txt").collect()) - except: - # fallback to local - with open("stopwords.txt", "r") as f: - return set([line.strip() for line in f]) def parse_csv_line(line): - # 使用 csv.reader 兼容逗号分隔含引号的数据 - reader = csv.reader([line]) - return next(reader) + """ + 解析 CSV 行(假设字段用 '","' 分隔,且首尾有引号) + 返回 (doc_id, text);text 为标题、描述、制造商字段拼接后的字符串 + """ + line = line.strip() + if not line: + return None + # 去除首尾引号,然后按","分隔 + # 注意:此处简单处理,要求 CSV 文件中没有嵌入额外的引号 + parts = line.strip('"').split('","') + if len(parts) < 4: + return None + doc_id = parts[0].strip() + # 将标题、描述、制造商合并 + text = "{} {} {}".format(parts[1].strip(), parts[2].strip(), parts[3].strip()) + return (doc_id, text) -def extract_info(line, source): - try: - fields = parse_csv_line(line) - if source == "google": - # Google: id, name, description, manufacturer... - pid = fields[0].strip() - text = "{} {} {}".format(fields[1], fields[2], fields[3]) - else: - # Amazon: id, title, description, manufacturer... - pid = fields[0].strip() - text = "{} {} {}".format(fields[1], fields[2], fields[3]) - return (pid, text) - except: - return (None, None) -if __name__ == "__main__": - sc = SparkContext(appName="InvertedIndex") +def tokenize(text): + """ + 分词:转成小写后提取所有字母数字字符组合(词) + """ + return re.findall(r'\w+', text.lower()) - stopwords = load_stopwords(sc) - # 加载数据 - google = sc.textFile("hdfs://master:9000/user/root/Google.csv") - amazon = sc.textFile("hdfs://master:9000/user/root/Amazon.csv") +# 读取数据文件并解析 +# 过滤掉可能的表头(假设表头 doc_id 为 "id") +amazon_rdd = sc.textFile(amazon_path).map(parse_csv_line) \ + .filter(lambda x: x is not None and x[0].lower() != "id") +google_rdd = sc.textFile(google_path).map(parse_csv_line) \ + .filter(lambda x: x is not None and x[0].lower() != "id") - # 提取内容 - google_rdd = google.map(lambda line: extract_info(line, "google")) \ - .filter(lambda x: x[0] is not None) +# 转换为 (doc_id, [token列表]) +amazonRecToToken = amazon_rdd.map(lambda x: (x[0], tokenize(x[1]))) +googleRecToToken = google_rdd.map(lambda x: (x[0], tokenize(x[1]))) - amazon_rdd = amazon.map(lambda line: extract_info(line, "amazon")) \ - .filter(lambda x: x[0] is not None) +# 合并语料库 +corpus = amazonRecToToken.union(googleRecToToken) +N = corpus.count() # 语料库中的文档总数 - # 合并两数据集 - all_data = google_rdd.union(amazon_rdd) +# --------------- 计算 TF(词频) --------------- +# 对每个文档,生成 ((doc_id, term), 1) 对 +doc_term_pairs = corpus.flatMap(lambda x: [((x[0], term), 1) for term in x[1]]) - # 构建倒排索引 - inverted_index = all_data.flatMap(lambda x: [((word, x[0])) for word in tokenize(x[1]) if word not in stopwords]) \ - .groupByKey() \ - .mapValues(lambda ids: list(set(ids))) +# 对同一文档中相同词求和得到每个 (doc_id, term) 的出现次数 +doc_term_counts = doc_term_pairs.reduceByKey(lambda a, b: a + b) - # 输出(可保存到 HDFS) - inverted_index.saveAsTextFile("hdfs://master:9000/user/root/output/inverted_index") +# 计算每个文档的总词数 (doc_id, total_terms) +doc_lengths = corpus.map(lambda x: (x[0], len(x[1]))) - sc.stop() +# 为方便后续 join,先将 doc_term_counts 转换为 (doc_id, (term, count)) +doc_term_counts_mapped = doc_term_counts.map(lambda x: (x[0][0], (x[0][1], x[1]))) + +# join 以获得每个文档的总词数 +tf_joined = doc_term_counts_mapped.join(doc_lengths) +# tf_joined 格式:(doc_id, ((term, count), total_terms)) + +# 计算 TF: count / total_terms,输出 ((doc_id, term), tf_value) +tf_rdd = tf_joined.map(lambda x: ((x[0], x[1][0][0]), float(x[1][0][1]) / float(x[1][1]))) + +# --------------- 计算 IDF(逆文档频率) --------------- +# 为每个文档生成 (term, doc_id) 对,注意使用 set 去重,避免重复计数 +term_doc_pairs = corpus.flatMap(lambda x: [(term, x[0]) for term in set(x[1])]) +# 去重后统计每个 term 出现的文档数 +df_rdd = term_doc_pairs.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b) + +# 计算 IDF,不取对数,使用公式:IDF(t) = N / df +idf_rdd = df_rdd.map(lambda x: (x[0], float(N) / float(x[1]))) + +# --------------- 计算 TF-IDF --------------- +# 将 tf_rdd 以 term 为 key,方便与 idf_rdd join +tf_rdd_by_term = tf_rdd.map(lambda x: (x[0][1], (x[0][0], x[1]))) +# join 得到 (term, ((doc_id, tf), idf)) +tfidf_joined = tf_rdd_by_term.join(idf_rdd) +# 计算 TF-IDF: tf * idf,输出格式 ((doc_id, term), tfidf_value) +tfidf_rdd = tfidf_joined.map(lambda x: ((x[1][0][0], x[0]), x[1][0][1] * x[1][1])) + +# --------------- 输出 TF-IDF 结果 --------------- + +output_path = "hdfs://master:9000/user/root/output/tfidf" +tfidf_rdd.saveAsTextFile(output_path) + +# 调试时打印前 5 个 TF-IDF 结果 +for item in tfidf_rdd.take(5): + print(item) + +sc.stop()