import re import matplotlib.pyplot as plt from pyspark import SparkContext sc = SparkContext.getOrCreate() LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)') def parse_log_line(line): match = LOG_PATTERN.match(line) if not match: return None content_size_str = match.group(9) content_size = int(content_size_str) if content_size_str.isdigit() else 0 return { 'ip': match.group(1), 'user_identity': match.group(2), 'user_id': match.group(3), 'timestamp': match.group(4), 'method': match.group(5), 'endpoint': match.group(6), 'protocol': match.group(7), 'status_code': int(match.group(8)), 'content_size': content_size } logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" raw_logs = sc.textFile(logFile) access_logs = raw_logs.map(parse_log_line).filter(lambda x: x is not None).cache() def analyze_content_sizes(access_logs): content_sizes = access_logs.map(lambda log: log['content_size']).cache() count = content_sizes.count() if count == 0: return {"avg": 0, "min": 0, "max": 0} total = content_sizes.sum() avg = total / count return { "avg": int(avg), "min": content_sizes.min(), "max": content_sizes.max() } def analyze_response_codes(access_logs): return dict(access_logs .map(lambda log: (log['status_code'], 1)) .reduceByKey(lambda a, b: a + b) .collect()) def plot_response_codes(response_code_counts): labels = ["Code {code}".format(code=code) for code in response_code_counts.keys()] counts = list(response_code_counts.values()) total = sum(counts) fracs = [cnt / total for cnt in counts] plt.figure(figsize=(8, 6)) plt.pie(fracs, labels=labels, autopct="%1.1f%%", startangle=90) plt.title("HTTP Response Code Distribution") plt.show() def find_top_hosts(access_logs, threshold=10): return (access_logs .map(lambda log: (log['ip'], 1)) .reduceByKey(lambda a, b: a + b) .filter(lambda x: x[1] > threshold) .takeOrdered(20, key=lambda x: -x[1])) def plot_endpoint_hits(access_logs): endpoint_counts = (access_logs .map(lambda log: (log['endpoint'], 1)) .reduceByKey(lambda a, b: a + b) .sortBy(lambda x: -x[1]) .collect()) top_50 = endpoint_counts[:50] endpoints = [ep for ep, _ in top_50] counts = [cnt for _, cnt in top_50] plt.figure(figsize=(14, 6)) plt.barh(endpoints[::-1], counts[::-1]) plt.title("Top 50 Endpoints by Hits") plt.xlabel("Number of Hits") plt.ylabel("Endpoint") plt.tight_layout() plt.show() if __name__ == "__main__": size_stats = analyze_content_sizes(access_logs) print("Content Size Analysis:\n" "Average: {avg} bytes\n" "Minimum: {min} bytes\n" "Maximum: {max} bytes".format( avg=size_stats['avg'], min=size_stats['min'], max=size_stats['max'] )) response_code_counts = analyze_response_codes(access_logs) print("\nResponse Code Distribution:") for code, count in response_code_counts.items(): print("Code {code}: {count} occurrences".format(code=code, count=count)) plot_response_codes(response_code_counts) top_hosts = find_top_hosts(access_logs) print("\nTop Hosts with >10 Requests:") for host, count in top_hosts: print("{host}: {count} requests".format(host=host, count=count)) plot_endpoint_hits(access_logs) sc.stop()