diff --git a/3-1.py b/3-1.py index ced77e3..6c07545 100644 --- a/3-1.py +++ b/3-1.py @@ -1,58 +1,119 @@ # -*- coding: utf-8 -*- """ 实验步骤3:利用 TF-IDF 加权提升文本相似性计算准确性 - 1. 读取 Amazon_small.csv 和 Google_small.csv 数据, - 提取文档ID和文本(标题、描述、制造商)的组合; - 2. 对文本进行分词,构建语料库(格式:(doc_id, [token列表])); - 3. 计算 TF(词频):统计每个词在文档中的出现次数除以该文档总词数; - 4. 计算 IDF(逆文档频率):IDF(t)= N / n(t),其中 N 为文档总数,n(t) 为包含词 t 的文档数; - 5. 计算 TF-IDF:每个词的 TF-IDF = TF * IDF; - 6. 输出格式:((doc_id, term), tfidf_value) -注意:确保 HDFS 上已上传以下文件: - - Amazon_small.csv - - Google_small.csv - - 若代码中需要停用词过滤,可以自行调整或扩展(本示例未特别去除停用词) +功能说明: + 1. 从 HDFS 上读取 Amazon_small.csv 和 Google_small.csv,提取文档ID及文本(标题、描述、制造商)的组合; + 2. 对文本进行分词,构建语料库,格式为:(doc_id, [token列表]); + 3. 实现 tf(tokens) 函数,计算 token 的相对词频; + 4. 实现 idfs(corpus) 函数,计算每个唯一 token 的逆文档频率,使用公式:IDF(t) = N / n(t); + 5. 实现 tfidf(tokens, idfs) 函数,结合 TF 与 IDF 权重计算每个 token 的 TF-IDF; + 6. 利用 RDD 操作计算全局 TF-IDF,并保存结果; + 7. 对 Amazon 记录 "b'000hkgj8k" 调用 tfidf() 测试,打印该记录中各 token 的 TF-IDF 权重。 + +注意: + (1) 代码使用 Python 3.5 语法; + (2) 请保证 HDFS 上已上传 Amazon_small.csv 和 Google_small.csv 文件; + (3) 如需停用词过滤,可在 tokenize() 函数中扩展,本示例未特别去除停用词。 """ from pyspark import SparkContext import re +import matplotlib.pyplot as plt sc = SparkContext(appName="TFIDF_Analysis") -# 请根据实际情况修改 HDFS 主机及端口,如 "hdfs://localhost:9000" + +############################### +# 1. 定义辅助函数 +############################### +def tokenize(text): + """ + 分词:转换成小写后提取所有字母或数字组成的单词 + """ + return re.findall(r'\w+', text.lower()) + + +def tf(tokens): + """ + 计算 TF(词频) + + Args: + tokens (list of str): 输入的 token 列表 + Returns: + dict: 每个 token 映射到其 TF 值(出现次数/总 token 数) + """ + total = len(tokens) + counts = {} + for token in tokens: + if token in counts: + counts[token] = counts[token] + 1 + else: + counts[token] = 1 + return {k: float(v) / total for k, v in counts.items()} + + +def idfs(corpus): + """ + 计算语料库中每个唯一 token 的 IDF 权重 + + Args: + corpus (RDD): 每个元素格式为 (doc_id, [token列表]) + Returns: + RDD: (token, IDF值) 的 RDD + """ + N = corpus.count() + # 对每个文档取唯一 token 集合(避免在同一文档中重复计数) + uniqueTokens = corpus.map(lambda x: set(x[1])) + # 对每个文档生成 (token, 1) 对,再求和计算包含该 token 的文档数 n(t) + tokenCountPair = uniqueTokens.flatMap(lambda tokens: [(token, 1) for token in tokens]) + tokenDocCounts = tokenCountPair.reduceByKey(lambda a, b: a + b) + # 计算 IDF,不取对数:IDF(t) = N / n(t) + return tokenDocCounts.map(lambda x: (x[0], float(N) / float(x[1]))) + + +def tfidf(tokens, idfs_dict): + """ + 计算 TF-IDF 权重 + + Args: + tokens (list of str): 输入 token 列表 + idfs_dict (dict): token 到其 IDF 权重的字典 + Returns: + dict: 每个 token 映射到其 TF-IDF 权重 + """ + tfs = tf(tokens) + # 对于在 idfs 字典中存在的 token 计算 TF * IDF + tfIdfDict = {token: tfs[token] * idfs_dict[token] for token in tfs if token in idfs_dict} + return tfIdfDict + + +############################### +# 2. 数据加载与预处理 +############################### +# 修改以下 HDFS 路径,根据你的集群配置,如 "hdfs://master:9000" amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" google_path = "hdfs://master:9000/user/root/Google_small.csv" def parse_csv_line(line): """ - 解析 CSV 行(假设字段用 '","' 分隔,且首尾有引号) - 返回 (doc_id, text);text 为标题、描述、制造商字段拼接后的字符串 + 解析 CSV 行(假设字段使用 '","' 分隔,且首尾有引号) + 返回 (doc_id, text),其中 text 为标题、描述、制造商字段拼接后的字符串 """ line = line.strip() if not line: return None - # 去除首尾引号,然后按","分隔 - # 注意:此处简单处理,要求 CSV 文件中没有嵌入额外的引号 + # 去除首尾引号后按 '","' 拆分 parts = line.strip('"').split('","') if len(parts) < 4: return None doc_id = parts[0].strip() - # 将标题、描述、制造商合并 text = "{} {} {}".format(parts[1].strip(), parts[2].strip(), parts[3].strip()) return (doc_id, text) -def tokenize(text): - """ - 分词:转成小写后提取所有字母数字字符组合(词) - """ - return re.findall(r'\w+', text.lower()) - - -# 读取数据文件并解析 -# 过滤掉可能的表头(假设表头 doc_id 为 "id") +# 读取数据,并过滤掉表头(假设表头中 doc_id 为 "id") amazon_rdd = sc.textFile(amazon_path).map(parse_csv_line) \ .filter(lambda x: x is not None and x[0].lower() != "id") google_rdd = sc.textFile(google_path).map(parse_csv_line) \ @@ -62,54 +123,71 @@ google_rdd = sc.textFile(google_path).map(parse_csv_line) \ amazonRecToToken = amazon_rdd.map(lambda x: (x[0], tokenize(x[1]))) googleRecToToken = google_rdd.map(lambda x: (x[0], tokenize(x[1]))) -# 合并语料库 -corpus = amazonRecToToken.union(googleRecToToken) -N = corpus.count() # 语料库中的文档总数 +# 创建语料库(corpusRDD) +corpusRDD = amazonRecToToken.union(googleRecToToken) +print("Corpus document count: {}".format(corpusRDD.count())) +# 注:测试要求 corpusRDD.count() 为 400,这里根据实际数据 -# --------------- 计算 TF(词频) --------------- -# 对每个文档,生成 ((doc_id, term), 1) 对 -doc_term_pairs = corpus.flatMap(lambda x: [((x[0], term), 1) for term in x[1]]) +############################### +# 3. 全局 IDF 计算 +############################### +idfsRDD = idfs(corpusRDD) +uniqueTokenCount = idfsRDD.count() +print("There are {} unique tokens in the small datasets.".format(uniqueTokenCount)) -# 对同一文档中相同词求和得到每个 (doc_id, term) 的出现次数 +# 打印 IDF 值最小的 11 个 token +smallIDFTokens = idfsRDD.takeOrdered(11, key=lambda s: s[1]) +print("Smallest 11 IDF tokens:") +for token, idf_value in smallIDFTokens: + print("{}: {}".format(token, idf_value)) + +# 绘制 IDF 直方图 +small_idf_values = idfsRDD.map(lambda s: s[1]).collect() +fig = plt.figure(figsize=(8, 3)) +plt.hist(small_idf_values, 50, log=True) +plt.title("IDF Histogram") +plt.xlabel("IDF value") +plt.ylabel("Frequency (log scale)") +plt.show() + +############################### +# 4. 全局 TF-IDF 计算(RDD实现) +############################### +# 计算 TF 部分:为每个文档生成 ((doc_id, term), tf) 对 + +doc_term_pairs = corpusRDD.flatMap(lambda x: [((x[0], term), 1) for term in x[1]]) doc_term_counts = doc_term_pairs.reduceByKey(lambda a, b: a + b) - -# 计算每个文档的总词数 (doc_id, total_terms) -doc_lengths = corpus.map(lambda x: (x[0], len(x[1]))) - -# 为方便后续 join,先将 doc_term_counts 转换为 (doc_id, (term, count)) +doc_lengths = corpusRDD.map(lambda x: (x[0], len(x[1]))) doc_term_counts_mapped = doc_term_counts.map(lambda x: (x[0][0], (x[0][1], x[1]))) - -# join 以获得每个文档的总词数 tf_joined = doc_term_counts_mapped.join(doc_lengths) -# tf_joined 格式:(doc_id, ((term, count), total_terms)) - -# 计算 TF: count / total_terms,输出 ((doc_id, term), tf_value) tf_rdd = tf_joined.map(lambda x: ((x[0], x[1][0][0]), float(x[1][0][1]) / float(x[1][1]))) - -# --------------- 计算 IDF(逆文档频率) --------------- -# 为每个文档生成 (term, doc_id) 对,注意使用 set 去重,避免重复计数 -term_doc_pairs = corpus.flatMap(lambda x: [(term, x[0]) for term in set(x[1])]) -# 去重后统计每个 term 出现的文档数 -df_rdd = term_doc_pairs.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b) - -# 计算 IDF,不取对数,使用公式:IDF(t) = N / df -idf_rdd = df_rdd.map(lambda x: (x[0], float(N) / float(x[1]))) - -# --------------- 计算 TF-IDF --------------- -# 将 tf_rdd 以 term 为 key,方便与 idf_rdd join +# 将 tf_rdd 转换为以 term 为 key,便于 join tf_rdd_by_term = tf_rdd.map(lambda x: (x[0][1], (x[0][0], x[1]))) -# join 得到 (term, ((doc_id, tf), idf)) -tfidf_joined = tf_rdd_by_term.join(idf_rdd) -# 计算 TF-IDF: tf * idf,输出格式 ((doc_id, term), tfidf_value) +# idfsRDD 格式为 (term, idf) +tfidf_joined = tf_rdd_by_term.join(idfsRDD) tfidf_rdd = tfidf_joined.map(lambda x: ((x[1][0][0], x[0]), x[1][0][1] * x[1][1])) - -# --------------- 输出 TF-IDF 结果 --------------- - +# 将全局 TF-IDF 结果保存到 HDFS(输出目录若已存在请删除或修改) output_path = "hdfs://master:9000/user/root/output/tfidf" tfidf_rdd.saveAsTextFile(output_path) - -# 调试时打印前 5 个 TF-IDF 结果 +print("Global TF-IDF result saved to: {}".format(output_path)) for item in tfidf_rdd.take(5): print(item) +############################### +# 5. 针对 Amazon 记录 "b'000hkgj8k" 的 TF-IDF 计算(使用封装函数) +############################### +# 提取记录 "b'000hkgj8k" 的 token 列表 +# 注意:实际记录的 doc_id 是否包含引号需与数据一致,此处示例与测试文本一致 +rec_tokens = amazonRecToToken.filter(lambda x: x[0] == "b'000hkgj8k").collect() +if rec_tokens: + rec_tokens = rec_tokens[0][1] + # 将全局 idfsRDD 转换成 Python 字典 + idfsWeights = idfsRDD.collectAsMap() + rec_tf_idf = tfidf(rec_tokens, idfsWeights) + print('Amazon record "b000hkgj8k" has tokens and TF-IDF weights:') + for token, weight in rec_tf_idf.items(): + print("{}: {}".format(token, weight)) +else: + print('Record "b000hkgj8k" not found in Amazon_small.csv') + sc.stop()