import re import math from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark import Broadcast # 创建 SparkContext 和 SQLContext sc = SparkContext(appName="ScalableER") sqlContext = SQLContext(sc) # 数据文件路径 amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" google_path = "hdfs://master:9000/user/root/Google_small.csv" def tokenize(text): """ 分词化:将文本转成小写并提取字母数字组合的词 """ return re.findall(r'\w+', text.lower()) def parse_data_file(line): """ 解析数据文件的每一行 """ line = line.strip() if not line: return None parts = line.split(',') if len(parts) < 5: return None doc_id = parts[0].strip() text = "{} {} {}".format(parts[1].strip(), parts[2].strip(), parts[3].strip()) return (doc_id, text) # 读取和解析数据 def load_data(path): """ 读取并解析数据文件 """ raw_data = sc.textFile(path).map(parse_data_file).filter(lambda x: x is not None) return raw_data amazon = load_data(amazon_path) google = load_data(google_path) # 对数据进行分词化 amazon_rec_to_token = amazon.map(lambda x: (x[0], tokenize(x[1]))) google_rec_to_token = google.map(lambda x: (x[0], tokenize(x[1]))) # 合并 Amazon 和 Google 数据集 full_corpus_rdd = amazon_rec_to_token.union(google_rec_to_token) # 计算 IDF def idfs(corpus): """ 计算逆文档频率 IDF """ N = corpus.count() # 文档总数 term_doc_pairs = corpus.flatMap(lambda x: [(term, x[0]) for term in set(x[1])]) df_rdd = term_doc_pairs.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b) idf_rdd = df_rdd.map(lambda x: (x[0], float(N) / float(x[1]))) return idf_rdd # 分词化完整数据集 amazonFullRecToToken = amazon.map(lambda line: (line[0], tokenize(line[1]))) googleFullRecToToken = google.map(lambda line: (line[0], tokenize(line[1]))) print('Amazon full dataset is {} products, Google full dataset is {} products'.format( amazonFullRecToToken.count(), googleFullRecToToken.count())) # 计算完整数据集的 IDF fullCorpusRDD = amazonFullRecToToken.union(googleFullRecToToken) idfsFull = idfs(fullCorpusRDD) idfsFullCount = idfsFull.count() print('There are %s unique tokens in the full datasets.' % idfsFullCount) # 创建广播变量 idfsFullWeights = idfsFull.collectAsMap() idfsFullBroadcast = sc.broadcast(idfsFullWeights) # 计算 TF-IDF def tfidf(tokens, idfs): """ 计算 TF-IDF 权重 """ tf = {} for token in tokens: tf[token] = tf.get(token, 0) + 1 tfidf_weights = {} for token, freq in tf.items(): if token in idfs: tfidf_weights[token] = freq * idfs[token] return tfidf_weights # 计算完整数据集的 TF-IDF amazonWeightsRDD = amazonFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value))) googleWeightsRDD = googleFullRecToToken.map(lambda x: (x[0], tfidf(x[1], idfsFullBroadcast.value))) print('There are {} Amazon weights and {} Google weights.'.format(amazonWeightsRDD.count(), googleWeightsRDD.count())) # 计算权重范数 def norm(weights): """ 计算向量的范数 """ return math.sqrt(sum([w * w for w in weights.values()])) # 计算 Amazon 和 Google 的权重范数 amazonNorms = amazonWeightsRDD.map(lambda x: (x[0], norm(x[1]))) amazonNormsBroadcast = sc.broadcast(amazonNorms.collectAsMap()) googleNorms = googleWeightsRDD.map(lambda x: (x[0], norm(x[1]))) googleNormsBroadcast = sc.broadcast(googleNorms.collectAsMap()) # 创建反向索引 def invert(record): """ Invert (ID, tokens) to a list of (token, ID) """ id = record[0] weights = record[1] pairs = [(token, id) for token in weights.keys()] return pairs amazonInvPairsRDD = amazonWeightsRDD.flatMap(lambda x: invert(x)).cache() googleInvPairsRDD = googleWeightsRDD.flatMap(lambda x: invert(x)).cache() print('There are {} Amazon inverted pairs and {} Google inverted pairs.'.format(amazonInvPairsRDD.count(), googleInvPairsRDD.count())) # 识别共有 token def swap(record): """ Swap (token, (ID, URL)) to ((ID, URL), token) """ token = record[0] keys = record[1] return (keys, token) commonTokens = (amazonInvPairsRDD .join(googleInvPairsRDD) .map(lambda x: swap(x)) .groupByKey() .map(lambda x: (x[0], list(x[1]))) .cache()) print('Found %d common tokens' % commonTokens.count()) # 计算余弦相似度 amazonWeightsBroadcast = sc.broadcast(amazonWeightsRDD.collectAsMap()) googleWeightsBroadcast = sc.broadcast(googleWeightsRDD.collectAsMap()) def fastCosineSimilarity(record): """ Compute Cosine Similarity using Broadcast variables """ amazonRec = record[0][0] googleRec = record[0][1] tokens = record[1] s = sum([(amazonWeightsBroadcast.value[amazonRec].get(token, 0) * googleWeightsBroadcast.value[googleRec].get(token, 0)) for token in tokens]) value = s / (amazonNormsBroadcast.value[amazonRec] * googleNormsBroadcast.value[googleRec]) key = (amazonRec, googleRec) return (key, value) similaritiesFullRDD = commonTokens.map(lambda x: fastCosineSimilarity(x)).cache() print(similaritiesFullRDD.count()) # 查看结果 print("Number of similarity records: {}".format(similaritiesFullRDD.count())) # 计算并测试相似度 similarity_test = similaritiesFullRDD.filter(lambda x: x[0][0] == 'b00005lzly' and x[0][1] == 'http://www.google.com/base/feeds/snippets/13823221823254120257').collect() print("Similarity test results:", similarity_test) print("Number of similarity test results:", len(similarity_test)) # 测试 if len(similarity_test) == 0: print("Warning: No matching records found for the given IDs.") else: assert len(similarity_test) >= 1, "incorrect len(similarity_test)" assert similaritiesFullRDD.count() == 27411, "incorrect similaritiesFullRDD.count()" sc.stop()