BD-exp-9/6-1.py

242 lines
7.7 KiB
Python
Raw Normal View History

from pyspark import SparkContext
from pyspark.accumulators import AccumulatorParam
import matplotlib.pyplot as plt
import re
import math
from pyspark.sql import SQLContext
from pyspark import Broadcast
# 创建 SparkContext 和 SQLContext
sc = SparkContext(appName="TextAnalysis")
sqlContext = SQLContext(sc)
# 数据文件路径
amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv"
google_path = "hdfs://master:9000/user/root/Google_small.csv"
def tokenize(text):
""" 分词化:将文本转成小写并提取字母数字组合的词 """
return re.findall(r'\w+', text.lower())
def parse_data_file(line):
""" 解析数据文件的每一行 """
line = line.strip()
if not line:
return None
parts = line.split(',')
if len(parts) < 5:
return None
doc_id = parts[0].strip()
text = "{} {} {}".format(parts[1].strip(), parts[2].strip(), parts[3].strip())
return (doc_id, text)
def load_data(path):
""" 读取并解析数据文件 """
raw_data = sc.textFile(path).map(parse_data_file).filter(lambda x: x is not None)
return raw_data
amazon = load_data(amazon_path)
google = load_data(google_path)
amazon_rec_to_token = amazon.map(lambda x: (x[0], tokenize(x[1])))
google_rec_to_token = google.map(lambda x: (x[0], tokenize(x[1])))
full_corpus_rdd = amazon_rec_to_token.union(google_rec_to_token)
def idfs(corpus):
""" 计算逆文档频率 IDF """
N = corpus.count() # 文档总数
term_doc_pairs = corpus.flatMap(lambda x: [(term, x[0]) for term in set(x[1])])
df_rdd = term_doc_pairs.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
idf_rdd = df_rdd.map(lambda x: (x[0], float(N) / float(x[1])))
return idf_rdd
amazonFullRecToToken = amazon.map(lambda line: (line[0], tokenize(line[1])))
googleFullRecToToken = google.map(lambda line: (line[0], tokenize(line[1])))
print('Amazon full dataset is {} products, Google full dataset is {} products'.format(
amazonFullRecToToken.count(),
googleFullRecToToken.count()))
fullCorpusRDD = amazonFullRecToToken.union(googleFullRecToToken)
idfsFull = idfs(fullCorpusRDD)
idfsFullCount = idfsFull.count()
print('There are %s unique tokens in the full datasets.' % idfsFullCount)
idfsFullWeights = idfsFull.collectAsMap()
idfsFullBroadcast = sc.broadcast(idfsFullWeights)
def tfidf(tokens, idfs):
""" 计算 TF-IDF 权重 """
tf = {}
for token in tokens:
tf[token] = tf.get(token, 0) + 1
tfidf_weights = {}
for token, freq in tf.items():
if token in idfs:
tfidf_weights[token] = freq * idfs[token]
return tfidf_weights
amazonWeightsRDD = amazonFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value)))
googleWeightsRDD = googleFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value)))
print('There are {} Amazon weights and {} Google weights.'.format(amazonWeightsRDD.count(),
googleWeightsRDD.count()))
def norm(weights):
""" 计算向量的范数 """
return math.sqrt(sum([w * w for w in weights.values()]))
amazonNorms = amazonWeightsRDD.map(lambda x: (x[0], norm(x[1])))
amazonNormsBroadcast = sc.broadcast(amazonNorms.collectAsMap())
googleNorms = googleWeightsRDD.map(lambda x: (x[0], norm(x[1])))
googleNormsBroadcast = sc.broadcast(googleNorms.collectAsMap())
def invert(record):
""" Invert (ID, tokens) to a list of (token, ID) """
id = record[0]
weights = record[1]
pairs = [(token, id) for token in weights.keys()]
return pairs
amazonInvPairsRDD = amazonWeightsRDD.flatMap(lambda x: invert(x)).cache()
googleInvPairsRDD = googleWeightsRDD.flatMap(lambda x: invert(x)).cache()
print('There are {} Amazon inverted pairs and {} Google inverted pairs.'.format(amazonInvPairsRDD.count(),
googleInvPairsRDD.count()))
def swap(record):
""" Swap (token, (ID, URL)) to ((ID, URL), token) """
token = record[0]
keys = record[1]
return (keys, token)
commonTokens = (amazonInvPairsRDD
.join(googleInvPairsRDD)
.map(lambda x: swap(x))
.groupByKey()
.map(lambda x: (x[0], list(x[1])))
.cache())
print('Found %d common tokens' % commonTokens.count())
amazonWeightsBroadcast = sc.broadcast(amazonWeightsRDD.collectAsMap())
googleWeightsBroadcast = sc.broadcast(googleWeightsRDD.collectAsMap())
def fastCosineSimilarity(record):
""" Compute Cosine Similarity using Broadcast variables """
amazonRec = record[0][0]
googleRec = record[0][1]
tokens = record[1]
s = sum([(amazonWeightsBroadcast.value[amazonRec].get(token, 0) * googleWeightsBroadcast.value[googleRec].get(token, 0))
for token in tokens])
value = s / (amazonNormsBroadcast.value[amazonRec] * googleNormsBroadcast.value[googleRec])
key = (amazonRec, googleRec)
return (key, value)
similaritiesFullRDD = commonTokens.map(lambda x: fastCosineSimilarity(x)).cache()
print(similaritiesFullRDD.count())
# 假设 goldStandard 已经存在
# goldStandard: RDD of ((Amazon ID, Google URL), 1) for true duplicates
# 创建 simsFullRDD 和 simsFullValuesRDD
simsFullRDD = similaritiesFullRDD.map(lambda x: ("%s %s" % (x[0][0], x[0][1]), x[1]))
simsFullValuesRDD = simsFullRDD.map(lambda x: x[1]).cache()
# 计算真阳性的相似度
def gs_value(record):
if record[1][1] is None:
return 0
else:
return record[1][1]
trueDupSimsRDD = (goldStandard
.leftOuterJoin(simsFullRDD)
.map(gs_value)
.cache())
print('There are %s true duplicates.' % trueDupSimsRDD.count())
# 定义累加器
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, value):
return [0] * len(value)
def addInPlace(self, val1, val2):
for i in range(len(val1)):
val1[i] += val2[i]
return val1
def set_bit(x, value, length):
bits = []
for y in range(length):
if x == y:
bits.append(value)
else:
bits.append(0)
return bits
BINS = 101
nthresholds = 100
def bin(similarity):
return int(similarity * nthresholds)
zeros = [0] * BINS
fpCounts = sc.accumulator(zeros, VectorAccumulatorParam())
def add_element(score):
global fpCounts
b = bin(score)
fpCounts += set_bit(b, 1, BINS)
simsFullValuesRDD.foreach(add_element)
def sub_element(score):
global fpCounts
b = bin(score)
fpCounts += set_bit(b, -1, BINS)
trueDupSimsRDD.foreach(sub_element)
def falsepos(threshold):
fpList = fpCounts.value
return sum([fpList[b] for b in range(0, BINS) if float(b) / nthresholds >= threshold])
def falseneg(threshold):
return trueDupSimsRDD.filter(lambda x: x < threshold).count()
def truepos(threshold):
return trueDupSimsRDD.count() - falseneg(threshold)
# 计算准确率、召回率和F度量
def precision(threshold):
tp = truepos(threshold)
return float(tp) / (tp + falsepos(threshold))
def recall(threshold):
tp = truepos(threshold)
return float(tp) / (tp + falseneg(threshold))
def fmeasure(threshold):
r = recall(threshold)
p = precision(threshold)
if r == 0 and p == 0:
return 1
else:
return 2 * r * p / (r + p)
# 生成阈值列表
thresholds = [float(n) / nthresholds for n in range(0, nthresholds)]
# 计算准确率、召回率和F度量的值
precisions = [precision(t) for t in thresholds]
recalls = [recall(t) for t in thresholds]
fmeasures = [fmeasure(t) for t in thresholds]
# 绘制折线图
fig = plt.figure()
plt.plot(thresholds, precisions)
plt.plot(thresholds, recalls)
plt.plot(thresholds, fmeasures)
plt.legend(['Precision', 'Recall', 'F-measure'])
plt.show()
# 停止 SparkContext
sc.stop()