from pyspark import SparkContext import re import math # Initialize SparkContext sc = SparkContext() # Define paths for Amazon and Google datasets amazon_path = "hdfs://master:9000/user/root/Amazon_small.csv" google_path = "hdfs://master:9000/user/root/Google_small.csv" # Load the datasets amazonData = sc.textFile(amazon_path) googleData = sc.textFile(google_path) # Define a function to tokenize a string (splitting by non-alphanumeric characters) def tokenize(text): return re.findall(r'\w+', text.lower()) # Sample IDF weights dictionary (use real IDF calculation in actual code) idfsSmallWeights = {"foo": 1.5, "bar": 1.2, "baz": 1.3} # TF-IDF function def tfidf(tokens, idfs): tf = {} for token in tokens: tf[token] = tf.get(token, 0) + 1 tfidf_values = {token: tf[token] * idfs.get(token, 0) for token in tf} return tfidf_values # Cosine Similarity function def dotprod(a, b): return sum(a.get(k, 0) * b.get(k, 0) for k in a if k in b) def norm(a): return math.sqrt(dotprod(a, a)) def cossim(a, b): return dotprod(a, b) / (norm(a) * norm(b)) if norm(a) > 0 and norm(b) > 0 else 0 # Calculate cosine similarity between two strings def cosineSimilarity(string1, string2, idfsDictionary): w1 = tfidf(tokenize(string1), idfsDictionary) w2 = tfidf(tokenize(string2), idfsDictionary) return cossim(w1, w2) # Parse the Amazon dataset: Split by commas and extract title and description def parse_amazon(line): parts = line.split(",") return (parts[0], parts[1], parts[2]) # Returning ID, title, and description # Parse the Google dataset: Split by commas and extract name and description def parse_google(line): parts = line.split(",") return (parts[0], parts[1], parts[2]) # Returning ID, name, and description # Process Amazon data amazonProcessed = amazonData.map(parse_amazon).map(lambda x: (x[0], x[1] + " " + x[2])) # Combine title and description # Process Google data googleProcessed = googleData.map(parse_google).map(lambda x: (x[0], x[1] + " " + x[2])) # Combine name and description # Cartesian join between Amazon and Google datasets crossSmall = amazonProcessed.cartesian(googleProcessed) # Compute similarity for each pair (Google, Amazon) def computeSimilarity(record): amazonRec = record[0] googleRec = record[1] amazonID = amazonRec[0] googleID = googleRec[0] amazonValue = amazonRec[1] googleValue = googleRec[1] cs = cosineSimilarity(googleValue, amazonValue, idfsSmallWeights) return (googleID, amazonID, cs) # Compute similarities for all pairs similarities = crossSmall.map(computeSimilarity) # Example to get the similarity for a specific Amazon ID and Google URL def similar(amazonID, googleID): return similarities.filter(lambda record: (record[0] == googleID and record[1] == amazonID)).collect() # Test similarity for a specific pair (replace with actual IDs) similarResult = similar("b'b000o24l3q", "b'http://www.google.com/base/feeds/snippets/17242822440574356561") print("Requested similarity is %s." % similarResult)