From 5770bc266ee671cf5df2de481d1399aaf09c2ca2 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Sun, 20 Apr 2025 03:19:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(6-1):=20=E5=AE=9E=E7=8E=B0=20TF-IDF=20?= =?UTF-8?q?=E5=92=8C=E4=BD=99=E5=BC=A6=E7=9B=B8=E4=BC=BC=E5=BA=A6=E8=AE=A1?= =?UTF-8?q?=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加分词和数据解析功能 - 实现逆文档频率 (IDF) 计算 - 计算 TF-IDF 权重 - 添加向量范数计算 - 实现倒排索引和快速余弦相似度计算 - 处理完整数据集并计算相似度 --- 6-1.py | 135 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 132 insertions(+), 3 deletions(-) diff --git a/6-1.py b/6-1.py index 1e92078..228c4b8 100644 --- a/6-1.py +++ b/6-1.py @@ -1,12 +1,141 @@ from pyspark import SparkContext from pyspark.accumulators import AccumulatorParam import matplotlib.pyplot as plt +import re +import math +from pyspark.sql import SQLContext +from pyspark import Broadcast -# 创建 SparkContext +# 创建 SparkContext 和 SQLContext sc = SparkContext(appName="TextAnalysis") +sqlContext = SQLContext(sc) -# 假设 similaritiesFullRDD 和 goldStandard 已经存在 -# similaritiesFullRDD: RDD of ((Amazon ID, Google URL), Similarity) +# 数据文件路径 +amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" +google_path = "hdfs://master:9000/user/root/Google_small.csv" + +def tokenize(text): + """ 分词化:将文本转成小写并提取字母数字组合的词 """ + return re.findall(r'\w+', text.lower()) + +def parse_data_file(line): + """ 解析数据文件的每一行 """ + line = line.strip() + if not line: + return None + parts = line.split(',') + if len(parts) < 5: + return None + doc_id = parts[0].strip() + text = "{} {} {}".format(parts[1].strip(), parts[2].strip(), parts[3].strip()) + return (doc_id, text) + +def load_data(path): + """ 读取并解析数据文件 """ + raw_data = sc.textFile(path).map(parse_data_file).filter(lambda x: x is not None) + return raw_data + +amazon = load_data(amazon_path) +google = load_data(google_path) + +amazon_rec_to_token = amazon.map(lambda x: (x[0], tokenize(x[1]))) +google_rec_to_token = google.map(lambda x: (x[0], tokenize(x[1]))) + +full_corpus_rdd = amazon_rec_to_token.union(google_rec_to_token) + +def idfs(corpus): + """ 计算逆文档频率 IDF """ + N = corpus.count() # 文档总数 + term_doc_pairs = corpus.flatMap(lambda x: [(term, x[0]) for term in set(x[1])]) + df_rdd = term_doc_pairs.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b) + idf_rdd = df_rdd.map(lambda x: (x[0], float(N) / float(x[1]))) + return idf_rdd + +amazonFullRecToToken = amazon.map(lambda line: (line[0], tokenize(line[1]))) +googleFullRecToToken = google.map(lambda line: (line[0], tokenize(line[1]))) +print('Amazon full dataset is {} products, Google full dataset is {} products'.format( + amazonFullRecToToken.count(), + googleFullRecToToken.count())) + +fullCorpusRDD = amazonFullRecToToken.union(googleFullRecToToken) +idfsFull = idfs(fullCorpusRDD) +idfsFullCount = idfsFull.count() +print('There are %s unique tokens in the full datasets.' % idfsFullCount) + +idfsFullWeights = idfsFull.collectAsMap() +idfsFullBroadcast = sc.broadcast(idfsFullWeights) + +def tfidf(tokens, idfs): + """ 计算 TF-IDF 权重 """ + tf = {} + for token in tokens: + tf[token] = tf.get(token, 0) + 1 + tfidf_weights = {} + for token, freq in tf.items(): + if token in idfs: + tfidf_weights[token] = freq * idfs[token] + return tfidf_weights + +amazonWeightsRDD = amazonFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value))) +googleWeightsRDD = googleFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value))) +print('There are {} Amazon weights and {} Google weights.'.format(amazonWeightsRDD.count(), + googleWeightsRDD.count())) + +def norm(weights): + """ 计算向量的范数 """ + return math.sqrt(sum([w * w for w in weights.values()])) + +amazonNorms = amazonWeightsRDD.map(lambda x: (x[0], norm(x[1]))) +amazonNormsBroadcast = sc.broadcast(amazonNorms.collectAsMap()) +googleNorms = googleWeightsRDD.map(lambda x: (x[0], norm(x[1]))) +googleNormsBroadcast = sc.broadcast(googleNorms.collectAsMap()) + +def invert(record): + """ Invert (ID, tokens) to a list of (token, ID) """ + id = record[0] + weights = record[1] + pairs = [(token, id) for token in weights.keys()] + return pairs + +amazonInvPairsRDD = amazonWeightsRDD.flatMap(lambda x: invert(x)).cache() +googleInvPairsRDD = googleWeightsRDD.flatMap(lambda x: invert(x)).cache() +print('There are {} Amazon inverted pairs and {} Google inverted pairs.'.format(amazonInvPairsRDD.count(), + googleInvPairsRDD.count())) + +def swap(record): + """ Swap (token, (ID, URL)) to ((ID, URL), token) """ + token = record[0] + keys = record[1] + return (keys, token) + +commonTokens = (amazonInvPairsRDD + .join(googleInvPairsRDD) + .map(lambda x: swap(x)) + .groupByKey() + .map(lambda x: (x[0], list(x[1]))) + .cache()) + +print('Found %d common tokens' % commonTokens.count()) + +amazonWeightsBroadcast = sc.broadcast(amazonWeightsRDD.collectAsMap()) +googleWeightsBroadcast = sc.broadcast(googleWeightsRDD.collectAsMap()) + +def fastCosineSimilarity(record): + """ Compute Cosine Similarity using Broadcast variables """ + amazonRec = record[0][0] + googleRec = record[0][1] + tokens = record[1] + s = sum([(amazonWeightsBroadcast.value[amazonRec].get(token, 0) * googleWeightsBroadcast.value[googleRec].get(token, 0)) + for token in tokens]) + value = s / (amazonNormsBroadcast.value[amazonRec] * googleNormsBroadcast.value[googleRec]) + key = (amazonRec, googleRec) + return (key, value) + +similaritiesFullRDD = commonTokens.map(lambda x: fastCosineSimilarity(x)).cache() + +print(similaritiesFullRDD.count()) + +# 假设 goldStandard 已经存在 # goldStandard: RDD of ((Amazon ID, Google URL), 1) for true duplicates # 创建 simsFullRDD 和 simsFullValuesRDD