from pyspark import SparkContext from pyspark.accumulators import AccumulatorParam import matplotlib.pyplot as plt import re import math from pyspark.sql import SQLContext from pyspark import Broadcast # 创建 SparkContext 和 SQLContext sc = SparkContext(appName="TextAnalysis") sqlContext = SQLContext(sc) # 数据文件路径 amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" google_path = "hdfs://master:9000/user/root/Google_small.csv" def tokenize(text): """ 分词化:将文本转成小写并提取字母数字组合的词 """ return re.findall(r'\w+', text.lower()) def parse_data_file(line): """ 解析数据文件的每一行 """ line = line.strip() if not line: return None parts = line.split(',') if len(parts) < 5: return None doc_id = parts[0].strip() text = "{} {} {}".format(parts[1].strip(), parts[2].strip(), parts[3].strip()) return (doc_id, text) def load_data(path): """ 读取并解析数据文件 """ raw_data = sc.textFile(path).map(parse_data_file).filter(lambda x: x is not None) return raw_data amazon = load_data(amazon_path) google = load_data(google_path) amazon_rec_to_token = amazon.map(lambda x: (x[0], tokenize(x[1]))) google_rec_to_token = google.map(lambda x: (x[0], tokenize(x[1]))) full_corpus_rdd = amazon_rec_to_token.union(google_rec_to_token) def idfs(corpus): """ 计算逆文档频率 IDF """ N = corpus.count() # 文档总数 term_doc_pairs = corpus.flatMap(lambda x: [(term, x[0]) for term in set(x[1])]) df_rdd = term_doc_pairs.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b) idf_rdd = df_rdd.map(lambda x: (x[0], float(N) / float(x[1]))) return idf_rdd amazonFullRecToToken = amazon.map(lambda line: (line[0], tokenize(line[1]))) googleFullRecToToken = google.map(lambda line: (line[0], tokenize(line[1]))) print('Amazon full dataset is {} products, Google full dataset is {} products'.format( amazonFullRecToToken.count(), googleFullRecToToken.count())) fullCorpusRDD = amazonFullRecToToken.union(googleFullRecToToken) idfsFull = idfs(fullCorpusRDD) idfsFullCount = idfsFull.count() print('There are %s unique tokens in the full datasets.' % idfsFullCount) idfsFullWeights = idfsFull.collectAsMap() idfsFullBroadcast = sc.broadcast(idfsFullWeights) def tfidf(tokens, idfs): """ 计算 TF-IDF 权重 """ tf = {} for token in tokens: tf[token] = tf.get(token, 0) + 1 tfidf_weights = {} for token, freq in tf.items(): if token in idfs: tfidf_weights[token] = freq * idfs[token] return tfidf_weights amazonWeightsRDD = amazonFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value))) googleWeightsRDD = googleFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value))) print('There are {} Amazon weights and {} Google weights.'.format(amazonWeightsRDD.count(), googleWeightsRDD.count())) def norm(weights): """ 计算向量的范数 """ return math.sqrt(sum([w * w for w in weights.values()])) amazonNorms = amazonWeightsRDD.map(lambda x: (x[0], norm(x[1]))) amazonNormsBroadcast = sc.broadcast(amazonNorms.collectAsMap()) googleNorms = googleWeightsRDD.map(lambda x: (x[0], norm(x[1]))) googleNormsBroadcast = sc.broadcast(googleNorms.collectAsMap()) def invert(record): """ Invert (ID, tokens) to a list of (token, ID) """ id = record[0] weights = record[1] pairs = [(token, id) for token in weights.keys()] return pairs amazonInvPairsRDD = amazonWeightsRDD.flatMap(lambda x: invert(x)).cache() googleInvPairsRDD = googleWeightsRDD.flatMap(lambda x: invert(x)).cache() print('There are {} Amazon inverted pairs and {} Google inverted pairs.'.format(amazonInvPairsRDD.count(), googleInvPairsRDD.count())) def swap(record): """ Swap (token, (ID, URL)) to ((ID, URL), token) """ token = record[0] keys = record[1] return (keys, token) commonTokens = (amazonInvPairsRDD .join(googleInvPairsRDD) .map(lambda x: swap(x)) .groupByKey() .map(lambda x: (x[0], list(x[1]))) .cache()) print('Found %d common tokens' % commonTokens.count()) amazonWeightsBroadcast = sc.broadcast(amazonWeightsRDD.collectAsMap()) googleWeightsBroadcast = sc.broadcast(googleWeightsRDD.collectAsMap()) def fastCosineSimilarity(record): """ Compute Cosine Similarity using Broadcast variables """ amazonRec = record[0][0] googleRec = record[0][1] tokens = record[1] s = sum([(amazonWeightsBroadcast.value[amazonRec].get(token, 0) * googleWeightsBroadcast.value[googleRec].get(token, 0)) for token in tokens]) value = s / (amazonNormsBroadcast.value[amazonRec] * googleNormsBroadcast.value[googleRec]) key = (amazonRec, googleRec) return (key, value) similaritiesFullRDD = commonTokens.map(lambda x: fastCosineSimilarity(x)).cache() print(similaritiesFullRDD.count()) # 假设 goldStandard 已经存在 # goldStandard: RDD of ((Amazon ID, Google URL), 1) for true duplicates # 定义 goldStandard goldStandard = sc.parallelize([ (("b00005lzly", "http://www.google.com/base/feeds/snippets/13823221823254120257"), 1), # 添加其他真实重复记录 ]) # 创建 simsFullRDD 和 simsFullValuesRDD simsFullRDD = similaritiesFullRDD.map(lambda x: ("%s %s" % (x[0][0], x[0][1]), x[1])) simsFullValuesRDD = simsFullRDD.map(lambda x: x[1]).cache() # 计算真阳性的相似度 def gs_value(record): if record[1][1] is None: return 0 else: return record[1][1] trueDupSimsRDD = (goldStandard .leftOuterJoin(simsFullRDD) .map(gs_value) .cache()) print('There are %s true duplicates.' % trueDupSimsRDD.count()) # 定义累加器 class VectorAccumulatorParam(AccumulatorParam): def zero(self, value): return [0] * len(value) def addInPlace(self, val1, val2): for i in range(len(val1)): val1[i] += val2[i] return val1 def set_bit(x, value, length): bits = [] for y in range(length): if x == y: bits.append(value) else: bits.append(0) return bits BINS = 101 nthresholds = 100 def bin(similarity): return int(similarity * nthresholds) zeros = [0] * BINS fpCounts = sc.accumulator(zeros, VectorAccumulatorParam()) def add_element(score): global fpCounts b = bin(score) fpCounts += set_bit(b, 1, BINS) simsFullValuesRDD.foreach(add_element) def sub_element(score): global fpCounts b = bin(score) fpCounts += set_bit(b, -1, BINS) trueDupSimsRDD.foreach(sub_element) def falsepos(threshold): fpList = fpCounts.value return sum([fpList[b] for b in range(0, BINS) if float(b) / nthresholds >= threshold]) def falseneg(threshold): return trueDupSimsRDD.filter(lambda x: x < threshold).count() def truepos(threshold): return trueDupSimsRDD.count() - falseneg(threshold) # 计算准确率、召回率和F度量 def precision(threshold): tp = truepos(threshold) return float(tp) / (tp + falsepos(threshold)) def recall(threshold): tp = truepos(threshold) return float(tp) / (tp + falseneg(threshold)) def fmeasure(threshold): r = recall(threshold) p = precision(threshold) if r == 0 and p == 0: return 1 else: return 2 * r * p / (r + p) # 生成阈值列表 thresholds = [float(n) / nthresholds for n in range(0, nthresholds)] # 计算准确率、召回率和F度量的值 precisions = [precision(t) for t in thresholds] recalls = [recall(t) for t in thresholds] fmeasures = [fmeasure(t) for t in thresholds] # 绘制折线图 fig = plt.figure() plt.plot(thresholds, precisions) plt.plot(thresholds, recalls) plt.plot(thresholds, fmeasures) plt.legend(['Precision', 'Recall', 'F-measure']) plt.show() # 停止 SparkContext sc.stop()