diff --git a/4-1.py b/4-1.py index 4a2ed22..f494baf 100644 --- a/4-1.py +++ b/4-1.py @@ -1,71 +1,86 @@ -import re -import os from pyspark import SparkContext +import re +import math -# 初始化 SparkContext -sc = SparkContext(appName="TextAnalysis") +# Initialize SparkContext +sc = SparkContext() -# 定义数据文件路径 -GOOGLE_PATH = 'Google.csv' -GOOGLE_SMALL_PATH = 'Google_small.csv' -AMAZON_PATH = 'Amazon.csv' -AMAZON_SMALL_PATH = 'Amazon_small.csv' -STOPWORDS_PATH = 'stopwords.txt' +# Define paths for Amazon and Google datasets +amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" +google_path = "hdfs://master:9000/user/root/Google_small.csv" -# 定义正则表达式模式,用于解析数据行 -DATAFILE_PATTERN = '^(.+),"(.+)",(.*),(.*),(.*)' +# Load the datasets +amazonData = sc.textFile(amazon_path) +googleData = sc.textFile(google_path) -def removeQuotes(s): - """ 去掉输入字符串中的引号 """ - return ''.join(i for i in s if i!='"') +# Define a function to tokenize a string (splitting by non-alphanumeric characters) +def tokenize(text): + return re.findall(r'\w+', text.lower()) -def parseDatafileLine(datafileLine): - """ 解析数据文件中的每一行 """ - match = re.search(DATAFILE_PATTERN, str(datafileLine)) - if match is None: - print('Invalid datafile line: %s' % datafileLine) - return (datafileLine, -1) - elif match.group(1) == '"id"': - print('Header datafile line: %s' % datafileLine) - return (datafileLine, 0) - else: - product = '%s %s %s' % (match.group(2), match.group(3), match.group(4)) - return ((removeQuotes(match.group(1)), product), 1) +# Sample IDF weights dictionary (use real IDF calculation in actual code) +idfsSmallWeights = {"foo": 1.5, "bar": 1.2, "baz": 1.3} -def parseData(filename): - """ 解析数据文件 """ - return (sc - .textFile(filename, 4, 0) - .map(parseDatafileLine) - .cache()) +# TF-IDF function +def tfidf(tokens, idfs): + tf = {} + for token in tokens: + tf[token] = tf.get(token, 0) + 1 + tfidf_values = {token: tf[token] * idfs.get(token, 0) for token in tf} + return tfidf_values -def loadData(path): - """ 加载数据文件 """ - filename = path - raw = parseData(filename).cache() - failed = (raw - .filter(lambda s: s[1] == -1) - .map(lambda s: s[0])) - for line in failed.take(1): - print ('{0} - Invalid datafile line: {1}'.format(path, line)) - valid = (raw - .filter(lambda s: s[1] == 1) - .map(lambda s: s[0]) - .cache()) - print ('{0} - Read {1} lines, successfully parsed {2} lines, failed to parse {3} lines'.format(path,raw.count(),valid.count(),failed.count())) - return valid +# Cosine Similarity function +def dotprod(a, b): + return sum(a.get(k, 0) * b.get(k, 0) for k in a if k in b) -# 加载数据 -googleSmall = loadData(GOOGLE_SMALL_PATH) -google = loadData(GOOGLE_PATH) -amazonSmall = loadData(AMAZON_SMALL_PATH) -amazon = loadData(AMAZON_PATH) +def norm(a): + return math.sqrt(dotprod(a, a)) -# 打印部分数据以检查 -for line in googleSmall.take(3): - print ('google: %s: %s\n' % (line[0], line[1])) +def cossim(a, b): + return dotprod(a, b) / (norm(a) * norm(b)) if norm(a) > 0 and norm(b) > 0 else 0 -for line in amazonSmall.take(3): - print ('amazon: %s: %s\n' % (line[0], line[1])) +# Calculate cosine similarity between two strings +def cosineSimilarity(string1, string2, idfsDictionary): + w1 = tfidf(tokenize(string1), idfsDictionary) + w2 = tfidf(tokenize(string2), idfsDictionary) + return cossim(w1, w2) + +# Parse the Amazon dataset: Split by commas and extract title and description +def parse_amazon(line): + parts = line.split(",") + return (parts[0], parts[1], parts[2]) # Returning ID, title, and description + +# Parse the Google dataset: Split by commas and extract name and description +def parse_google(line): + parts = line.split(",") + return (parts[0], parts[1], parts[2]) # Returning ID, name, and description + +# Process Amazon data +amazonProcessed = amazonData.map(parse_amazon).map(lambda x: (x[0], x[1] + " " + x[2])) # Combine title and description +# Process Google data +googleProcessed = googleData.map(parse_google).map(lambda x: (x[0], x[1] + " " + x[2])) # Combine name and description + +# Cartesian join between Amazon and Google datasets +crossSmall = amazonProcessed.cartesian(googleProcessed) + +# Compute similarity for each pair (Google, Amazon) +def computeSimilarity(record): + amazonRec = record[0] + googleRec = record[1] + amazonID = amazonRec[0] + googleID = googleRec[0] + amazonValue = amazonRec[1] + googleValue = googleRec[1] + cs = cosineSimilarity(googleValue, amazonValue, idfsSmallWeights) + return (googleID, amazonID, cs) + +# Compute similarities for all pairs +similarities = crossSmall.map(computeSimilarity) + +# Example to get the similarity for a specific Amazon ID and Google URL +def similar(amazonID, googleID): + return similarities.filter(lambda record: (record[0] == googleID and record[1] == amazonID)).collect() + +# Test similarity for a specific pair (replace with actual IDs) +similarResult = similar("b'b000o24l3q", "b'http://www.google.com/base/feeds/snippets/17242822440574356561") +print("Requested similarity is %s." % similarResult) -# 假设数据现在已经正确加载,你可以继续后续的分析