From 6eb33193b83f453e589b369749c6f2ea06a6f84f Mon Sep 17 00:00:00 2001 From: fly6516 Date: Mon, 14 Apr 2025 01:10:45 +0800 Subject: [PATCH] =?UTF-8?q?feat(log=5Fanalysis=5Fstep2.py):=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E6=97=A5=E5=BF=97=E5=88=86=E6=9E=90=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增日志解析、内容大小分析、响应码分析等功能 - 实现了访问最多的主机和端点统计 - 添加了数据可视化图表,包括响应码分布和热门端点 --- log_analysis_step2.py | 125 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/log_analysis_step2.py b/log_analysis_step2.py index e69de29..6105ba7 100644 --- a/log_analysis_step2.py +++ b/log_analysis_step2.py @@ -0,0 +1,125 @@ +import re +import matplotlib.pyplot as plt +from pyspark import SparkContext + +sc = SparkContext.getOrCreate() + +LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)') + + +def parse_log_line(line): + match = LOG_PATTERN.match(line) + if not match: + return None + + content_size_str = match.group(9) + content_size = int(content_size_str) if content_size_str.isdigit() else 0 + + return { + 'ip': match.group(1), + 'user_identity': match.group(2), + 'user_id': match.group(3), + 'timestamp': match.group(4), + 'method': match.group(5), + 'endpoint': match.group(6), + 'protocol': match.group(7), + 'status_code': int(match.group(8)), + 'content_size': content_size + } + + +logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" +raw_logs = sc.textFile(logFile) +access_logs = raw_logs.map(parse_log_line).filter(lambda x: x is not None).cache() + + +def analyze_content_sizes(access_logs): + content_sizes = access_logs.map(lambda log: log['content_size']).cache() + count = content_sizes.count() + + if count == 0: + return {"avg": 0, "min": 0, "max": 0} + + total = content_sizes.sum() + avg = total / count + return { + "avg": int(avg), + "min": content_sizes.min(), + "max": content_sizes.max() + } + + +def analyze_response_codes(access_logs): + return dict(access_logs + .map(lambda log: (log['status_code'], 1)) + .reduceByKey(lambda a, b: a + b) + .collect()) + + +def plot_response_codes(response_code_counts): + labels = ["Code {code}".format(code=code) for code in response_code_counts.keys()] + counts = list(response_code_counts.values()) + total = sum(counts) + fracs = [cnt / total for cnt in counts] + + plt.figure(figsize=(8, 6)) + plt.pie(fracs, labels=labels, autopct="%1.1f%%", startangle=90) + plt.title("HTTP Response Code Distribution") + plt.show() + + +def find_top_hosts(access_logs, threshold=10): + return (access_logs + .map(lambda log: (log['ip'], 1)) + .reduceByKey(lambda a, b: a + b) + .filter(lambda x: x[1] > threshold) + .takeOrdered(20, key=lambda x: -x[1])) + + +def plot_endpoint_hits(access_logs): + endpoint_counts = (access_logs + .map(lambda log: (log['endpoint'], 1)) + .reduceByKey(lambda a, b: a + b) + .sortBy(lambda x: -x[1]) + .collect()) + + top_50 = endpoint_counts[:50] + endpoints = [ep for ep, _ in top_50] + counts = [cnt for _, cnt in top_50] + + plt.figure(figsize=(14, 6)) + plt.barh(endpoints[::-1], counts[::-1]) + plt.title("Top 50 Endpoints by Hits") + plt.xlabel("Number of Hits") + plt.ylabel("Endpoint") + plt.tight_layout() + plt.show() + + +if __name__ == "__main__": + size_stats = analyze_content_sizes(access_logs) + + print("Content Size Analysis:\n" + "Average: {avg} bytes\n" + "Minimum: {min} bytes\n" + "Maximum: {max} bytes".format( + avg=size_stats['avg'], + min=size_stats['min'], + max=size_stats['max'] + )) + + response_code_counts = analyze_response_codes(access_logs) + print("\nResponse Code Distribution:") + for code, count in response_code_counts.items(): + print("Code {code}: {count} occurrences".format(code=code, count=count)) + + plot_response_codes(response_code_counts) + + top_hosts = find_top_hosts(access_logs) + print("\nTop Hosts with >10 Requests:") + for host, count in top_hosts: + print("{host}: {count} requests".format(host=host, count=count)) + + plot_endpoint_hits(access_logs) + +sc.stop()