BD-exp-9/5-1.py
fly6516 1d16bebe43 refactor(5-1):重构代码以提高可读性和效率
- 重新组织代码结构,使逻辑更清晰
- 使用更有意义的变量名,提高代码可读性
- 移除冗余的中间变量,简化代码- 添加注释以解释关键步骤
2025-04-20 03:04:39 +08:00

158 lines
5.5 KiB
Python

import re
import math
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import Broadcast
# 创建 SparkContext 和 SQLContext
sc = SparkContext(appName="ScalableER")
sqlContext = SQLContext(sc)
# 数据文件路径
amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv"
google_path = "hdfs://master:9000/user/root/Google_small.csv"
def tokenize(text):
""" 分词化:将文本转成小写并提取字母数字组合的词 """
return re.findall(r'\w+', text.lower())
def parse_data_file(line):
""" 解析数据文件的每一行 """
line = line.strip()
if not line:
return None
parts = line.split(',')
if len(parts) < 5:
return None
doc_id = parts[0].strip()
text = "{} {} {}".format(parts[1].strip(), parts[2].strip(), parts[3].strip())
return (doc_id, text)
# 读取和解析数据
def load_data(path):
""" 读取并解析数据文件 """
raw_data = sc.textFile(path).map(parse_data_file).filter(lambda x: x is not None)
return raw_data
amazon = load_data(amazon_path)
google = load_data(google_path)
# 对数据进行分词化
amazon_rec_to_token = amazon.map(lambda x: (x[0], tokenize(x[1])))
google_rec_to_token = google.map(lambda x: (x[0], tokenize(x[1])))
# 合并 Amazon 和 Google 数据集
full_corpus_rdd = amazon_rec_to_token.union(google_rec_to_token)
# 计算 IDF
def idfs(corpus):
""" 计算逆文档频率 IDF """
N = corpus.count() # 文档总数
term_doc_pairs = corpus.flatMap(lambda x: [(term, x[0]) for term in set(x[1])])
df_rdd = term_doc_pairs.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
idf_rdd = df_rdd.map(lambda x: (x[0], float(N) / float(x[1])))
return idf_rdd
# 分词化完整数据集
amazonFullRecToToken = amazon.map(lambda line: (line[0], tokenize(line[1])))
googleFullRecToToken = google.map(lambda line: (line[0], tokenize(line[1])))
print('Amazon full dataset is {} products, Google full dataset is {} products'.format(
amazonFullRecToToken.count(),
googleFullRecToToken.count()))
# 计算完整数据集的 IDF
fullCorpusRDD = amazonFullRecToToken.union(googleFullRecToToken)
idfsFull = idfs(fullCorpusRDD)
idfsFullCount = idfsFull.count()
print('There are %s unique tokens in the full datasets.' % idfsFullCount)
# 创建广播变量
idfsFullWeights = idfsFull.collectAsMap()
idfsFullBroadcast = sc.broadcast(idfsFullWeights)
# 计算完整数据集的 TF-IDF
amazonWeightsRDD = amazonFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value)))
googleWeightsRDD = googleFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value)))
print('There are {} Amazon weights and {} Google weights.'.format(amazonWeightsRDD.count(),
googleWeightsRDD.count()))
# 计算权重范数
def norm(weights):
""" 计算向量的范数 """
return math.sqrt(sum([w * w for w in weights.values()]))
# 计算 Amazon 和 Google 的权重范数
amazonNorms = amazonWeightsRDD.map(lambda x: (x[0], norm(x[1])))
amazonNormsBroadcast = sc.broadcast(amazonNorms.collectAsMap())
googleNorms = googleWeightsRDD.map(lambda x: (x[0], norm(x[1])))
googleNormsBroadcast = sc.broadcast(googleNorms.collectAsMap())
# 创建反向索引
def invert(record):
""" Invert (ID, tokens) to a list of (token, ID) """
id = record[0]
weights = record[1]
pairs = [(token, id) for token in weights.keys()]
return pairs
amazonInvPairsRDD = amazonWeightsRDD.flatMap(lambda x: invert(x)).cache()
googleInvPairsRDD = googleWeightsRDD.flatMap(lambda x: invert(x)).cache()
print('There are {} Amazon inverted pairs and {} Google inverted pairs.'.format(amazonInvPairsRDD.count(),
googleInvPairsRDD.count()))
# 识别共有 token
def swap(record):
""" Swap (token, (ID, URL)) to ((ID, URL), token) """
token = record[0]
keys = record[1]
return (keys, token)
commonTokens = (amazonInvPairsRDD
.join(googleInvPairsRDD)
.map(lambda x: swap(x))
.groupByKey()
.map(lambda x: (x[0], list(x[1])))
.cache())
print('Found %d common tokens' % commonTokens.count())
# 计算余弦相似度
amazonWeightsBroadcast = sc.broadcast(amazonWeightsRDD.collectAsMap())
googleWeightsBroadcast = sc.broadcast(googleWeightsRDD.collectAsMap())
def fastCosineSimilarity(record):
""" Compute Cosine Similarity using Broadcast variables """
amazonRec = record[0][0]
googleRec = record[0][1]
tokens = record[1]
s = sum([(amazonWeightsBroadcast.value[amazonRec].get(token, 0) * googleWeightsBroadcast.value[googleRec].get(token, 0))
for token in tokens])
value = s / (amazonNormsBroadcast.value[amazonRec] * googleNormsBroadcast.value[googleRec])
key = (amazonRec, googleRec)
return (key, value)
similaritiesFullRDD = commonTokens.map(lambda x: fastCosineSimilarity(x)).cache()
print(similaritiesFullRDD.count())
# 查看结果
print("Number of similarity records: {}".format(similaritiesFullRDD.count()))
# 计算并测试相似度
similarity_test = similarities_full_rdd.filter(lambda x: x[0][0] == 'b00005lzly' and x[0][
1] == 'http://www.google.com/base/feeds/snippets/13823221823254120257').collect()
print(len(similarity_test))
# 测试
assert len(similarity_test) == 1, "incorrect len(similarity_test)"
assert similarities_full_rdd.count() == 2441088, "incorrect similarities_full_rdd.count()"
sc.stop()