BD-exp-9/4-1.py

187 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
import re
from pyspark import SparkContext, Broadcast
from pyspark.sql import SparkSession
# 1. 计算点积
def dotprod(a, b):
""" Compute dot product
Args:
a (dict): first dictionary of record to value
b (dict): second dictionary of record to value
Returns:
float: result of the dot product with the two input dictionaries
"""
sum = 0.0
for k1, v1 in a.items():
for k2, v2 in b.items():
if k1 == k2:
sum += v1 * v2
return sum
# 2. 计算范数
def norm(a):
""" Compute square root of the dot product
Args:
a (dict): a dictionary of record to value
Returns:
float: norm of the dictionary
"""
return math.sqrt(dotprod(a, a))
# 3. 计算余弦相似度
def cossim(a, b):
""" Compute cosine similarity
Args:
a (dict): first dictionary of record to value
b (dict): second dictionary of record to value
Returns:
float: cosine similarity value
"""
return dotprod(a, b) / (norm(a) * norm(b))
# 4. 计算TF-IDF
def tfidf(tokens, idfsDictionary):
""" Calculate TF-IDF values for token list
Args:
tokens (list): list of tokens
idfsDictionary (dict): IDF values
Returns:
dict: dictionary of token -> TF-IDF value
"""
tf = {}
for token in tokens:
tf[token] = tf.get(token, 0) + 1
total_tokens = len(tokens)
for token in tf:
tf[token] = tf[token] / total_tokens
tfidf = {token: tf[token] * idfsDictionary.get(token, 0) for token in tf}
return tfidf
# 5. 余弦相似度计算函数
def cosineSimilarity(string1, string2, idfsDictionary):
""" Compute cosine similarity between two strings using TF-IDF weights
Args:
string1 (str): first string
string2 (str): second string
idfsDictionary (dict): IDF dictionary
Returns:
float: cosine similarity
"""
tokens1 = tokenize(string1)
tokens2 = tokenize(string2)
tfidf1 = tfidf(tokens1, idfsDictionary)
tfidf2 = tfidf(tokens2, idfsDictionary)
return cossim(tfidf1, tfidf2)
# 6. Tokenize function (split by spaces or punctuation)
def tokenize(text):
""" Tokenizes a string into a list of words
Args:
text (str): input string
Returns:
list: list of tokens (words)
"""
text = re.sub(r'[^\w\s]', '', text.lower())
return text.split()
# 7. 计算相似度的RDD处理函数
def computeSimilarity(record, idfsDictionary):
""" Compute similarity on a combination record
Args:
record (tuple): (google record, amazon record)
idfsDictionary (dict): IDF dictionary
Returns:
tuple: (google URL, amazon ID, cosine similarity value)
"""
googleRec = record[0]
amazonRec = record[1]
googleURL = googleRec[0]
amazonID = amazonRec[0]
googleValue = googleRec[1]
amazonValue = amazonRec[1]
cs = cosineSimilarity(googleValue, amazonValue, idfsDictionary)
return (googleURL, amazonID, cs)
# 8. 解析黄金标准数据
def parse_goldfile_line(goldfile_line):
""" Parse a line from the 'golden standard' data file
Args:
goldfile_line (str): a line of data
Returns:
tuple: ((key, 'gold', 1 if successful or else 0))
"""
GOLDFILE_PATTERN = '^(.+),(.+)'
match = re.search(GOLDFILE_PATTERN, goldfile_line)
if match is None:
print('Invalid goldfile line: {0}'.format(goldfile_line))
return (goldfile_line, -1)
elif match.group(1) == '"idAmazon"':
print('Header datafile line: {0}'.format(goldfile_line))
return (goldfile_line, 0)
else:
key = '{0} {1}'.format(match.group(1), match.group(2))
return ((key, 'gold'), 1)
# 9. 使用广播变量提高效率
def computeSimilarityBroadcast(record, idfsBroadcast):
""" Compute similarity on a combination record, using Broadcast variable
Args:
record (tuple): (google record, amazon record)
idfsBroadcast (Broadcast): broadcasted IDF dictionary
Returns:
tuple: (google URL, amazon ID, cosine similarity value)
"""
googleRec = record[0]
amazonRec = record[1]
googleURL = googleRec[0]
amazonID = amazonRec[0]
googleValue = googleRec[1]
amazonValue = amazonRec[1]
cs = cosineSimilarity(googleValue, amazonValue, idfsBroadcast.value)
return (googleURL, amazonID, cs)
# 主函数设置Spark上下文和广播变量
if __name__ == "__main__":
# 创建SparkSession
spark = SparkSession.builder \
.appName("TextSimilarity") \
.getOrCreate()
sc = spark.sparkContext
# HDFS路径
amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv"
google_path = "hdfs://master:9000/user/root/Google_small.csv"
# 假设的IDF权重字典
idfsDictionary = {
"hello": 1.2,
"world": 1.3,
"goodbye": 1.1,
"photoshop": 2.5,
"illustrator": 2.7
}
# 创建广播变量
idfsBroadcast = sc.broadcast(idfsDictionary)
# 加载CSV数据
amazon_data = spark.read.csv(amazon_path, header=True, inferSchema=True)
google_data = spark.read.csv(google_path, header=True, inferSchema=True)
# 假设的列名,根据实际数据进行调整
amazon_small = amazon_data.select("asin", "description").rdd.map(lambda x: (x[0], x[1]))
google_small = google_data.select("url", "description").rdd.map(lambda x: (x[0], x[1]))
# 计算相似度
cross_small = amazon_small.cartesian(google_small)
similarities = cross_small.map(lambda x: computeSimilarityBroadcast(x, idfsBroadcast))
# 打印结果
similarities.collect()
# 关闭Spark上下文
sc.stop()
spark.stop()