fix(3-1): 更新 HDFS 地址

- 将 HDFS 地址从 "hdfs:///user/root/output/inverted_index" 修改为 "hdfs://master:9000/user/root/output/inverted_index"
- 这个修改可能是为了适应不同的 HDFS集群配置,确保数据保存到正确的地址
This commit is contained in:
fly6516 2025-04-16 09:53:48 +08:00
parent ceaf470df4
commit 739ad9d6cb

149
3-1.py
View File

@ -1,66 +1,115 @@
# coding=utf-8
# -*- coding: utf-8 -*-
"""
实验步骤3利用 TF-IDF 加权提升文本相似性计算准确性
1. 读取 Amazon_small.csv Google_small.csv 数据
提取文档ID和文本标题描述制造商的组合
2. 对文本进行分词构建语料库格式(doc_id, [token列表])
3. 计算 TF词频统计每个词在文档中的出现次数除以该文档总词数
4. 计算 IDF逆文档频率IDF(t)= N / n(t)其中 N 为文档总数n(t) 为包含词 t 的文档数
5. 计算 TF-IDF每个词的 TF-IDF = TF * IDF
6. 输出格式((doc_id, term), tfidf_value)
注意确保 HDFS 上已上传以下文件
- Amazon_small.csv
- Google_small.csv
- 若代码中需要停用词过滤可以自行调整或扩展本示例未特别去除停用词
"""
from pyspark import SparkContext
import csv
import re
# Python 3.5 没有 f-string使用 format
def tokenize(text):
# 分词并保留英文、数字
return re.findall(r'\w+', text.lower())
sc = SparkContext(appName="TFIDF_Analysis")
# 请根据实际情况修改 HDFS 主机及端口,如 "hdfs://localhost:9000"
amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv"
google_path = "hdfs://master:9000/user/root/Google_small.csv"
def load_stopwords(sc):
try:
return set(sc.textFile("hdfs://master:9000/user/root/stopwords.txt").collect())
except:
# fallback to local
with open("stopwords.txt", "r") as f:
return set([line.strip() for line in f])
def parse_csv_line(line):
# 使用 csv.reader 兼容逗号分隔含引号的数据
reader = csv.reader([line])
return next(reader)
"""
解析 CSV 假设字段用 '","' 分隔且首尾有引号
返回 (doc_id, text)text 为标题描述制造商字段拼接后的字符串
"""
line = line.strip()
if not line:
return None
# 去除首尾引号,然后按","分隔
# 注意:此处简单处理,要求 CSV 文件中没有嵌入额外的引号
parts = line.strip('"').split('","')
if len(parts) < 4:
return None
doc_id = parts[0].strip()
# 将标题、描述、制造商合并
text = "{} {} {}".format(parts[1].strip(), parts[2].strip(), parts[3].strip())
return (doc_id, text)
def extract_info(line, source):
try:
fields = parse_csv_line(line)
if source == "google":
# Google: id, name, description, manufacturer...
pid = fields[0].strip()
text = "{} {} {}".format(fields[1], fields[2], fields[3])
else:
# Amazon: id, title, description, manufacturer...
pid = fields[0].strip()
text = "{} {} {}".format(fields[1], fields[2], fields[3])
return (pid, text)
except:
return (None, None)
if __name__ == "__main__":
sc = SparkContext(appName="InvertedIndex")
def tokenize(text):
"""
分词转成小写后提取所有字母数字字符组合
"""
return re.findall(r'\w+', text.lower())
stopwords = load_stopwords(sc)
# 加载数据
google = sc.textFile("hdfs://master:9000/user/root/Google.csv")
amazon = sc.textFile("hdfs://master:9000/user/root/Amazon.csv")
# 读取数据文件并解析
# 过滤掉可能的表头(假设表头 doc_id 为 "id"
amazon_rdd = sc.textFile(amazon_path).map(parse_csv_line) \
.filter(lambda x: x is not None and x[0].lower() != "id")
google_rdd = sc.textFile(google_path).map(parse_csv_line) \
.filter(lambda x: x is not None and x[0].lower() != "id")
# 提取内容
google_rdd = google.map(lambda line: extract_info(line, "google")) \
.filter(lambda x: x[0] is not None)
# 转换为 (doc_id, [token列表])
amazonRecToToken = amazon_rdd.map(lambda x: (x[0], tokenize(x[1])))
googleRecToToken = google_rdd.map(lambda x: (x[0], tokenize(x[1])))
amazon_rdd = amazon.map(lambda line: extract_info(line, "amazon")) \
.filter(lambda x: x[0] is not None)
# 合并语料库
corpus = amazonRecToToken.union(googleRecToToken)
N = corpus.count() # 语料库中的文档总数
# 合并两数据集
all_data = google_rdd.union(amazon_rdd)
# --------------- 计算 TF词频 ---------------
# 对每个文档,生成 ((doc_id, term), 1) 对
doc_term_pairs = corpus.flatMap(lambda x: [((x[0], term), 1) for term in x[1]])
# 构建倒排索引
inverted_index = all_data.flatMap(lambda x: [((word, x[0])) for word in tokenize(x[1]) if word not in stopwords]) \
.groupByKey() \
.mapValues(lambda ids: list(set(ids)))
# 对同一文档中相同词求和得到每个 (doc_id, term) 的出现次数
doc_term_counts = doc_term_pairs.reduceByKey(lambda a, b: a + b)
# 输出(可保存到 HDFS
inverted_index.saveAsTextFile("hdfs://master:9000/user/root/output/inverted_index")
# 计算每个文档的总词数 (doc_id, total_terms)
doc_lengths = corpus.map(lambda x: (x[0], len(x[1])))
sc.stop()
# 为方便后续 join先将 doc_term_counts 转换为 (doc_id, (term, count))
doc_term_counts_mapped = doc_term_counts.map(lambda x: (x[0][0], (x[0][1], x[1])))
# join 以获得每个文档的总词数
tf_joined = doc_term_counts_mapped.join(doc_lengths)
# tf_joined 格式:(doc_id, ((term, count), total_terms))
# 计算 TF count / total_terms输出 ((doc_id, term), tf_value)
tf_rdd = tf_joined.map(lambda x: ((x[0], x[1][0][0]), float(x[1][0][1]) / float(x[1][1])))
# --------------- 计算 IDF逆文档频率 ---------------
# 为每个文档生成 (term, doc_id) 对,注意使用 set 去重,避免重复计数
term_doc_pairs = corpus.flatMap(lambda x: [(term, x[0]) for term in set(x[1])])
# 去重后统计每个 term 出现的文档数
df_rdd = term_doc_pairs.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
# 计算 IDF不取对数使用公式IDF(t) = N / df
idf_rdd = df_rdd.map(lambda x: (x[0], float(N) / float(x[1])))
# --------------- 计算 TF-IDF ---------------
# 将 tf_rdd 以 term 为 key方便与 idf_rdd join
tf_rdd_by_term = tf_rdd.map(lambda x: (x[0][1], (x[0][0], x[1])))
# join 得到 (term, ((doc_id, tf), idf))
tfidf_joined = tf_rdd_by_term.join(idf_rdd)
# 计算 TF-IDF tf * idf输出格式 ((doc_id, term), tfidf_value)
tfidf_rdd = tfidf_joined.map(lambda x: ((x[1][0][0], x[0]), x[1][0][1] * x[1][1]))
# --------------- 输出 TF-IDF 结果 ---------------
output_path = "hdfs://master:9000/user/root/output/tfidf"
tfidf_rdd.saveAsTextFile(output_path)
# 调试时打印前 5 个 TF-IDF 结果
for item in tfidf_rdd.take(5):
print(item)
sc.stop()