From c48a91f11e6a6d607542559940254ca4796f95ac Mon Sep 17 00:00:00 2001 From: fly6516 Date: Mon, 14 Apr 2025 01:49:05 +0800 Subject: [PATCH] =?UTF-8?q?refactor(1-1.py):=20=E9=87=8D=E6=9E=84=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E5=88=86=E6=9E=90=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 重新编写日志解析逻辑,使用正则表达式匹配日志行 - 添加错误处理和日志文件为空时的处理逻辑- 优化 Top 10 最常访问的端点统计代码- 使用 f-string 改进代码可读性 - 添加 SparkContext 初始化和停止逻辑 --- 1-1.py | 49 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/1-1.py b/1-1.py index f08f1af..b27440b 100644 --- a/1-1.py +++ b/1-1.py @@ -1,11 +1,48 @@ -from log_analysis_step2 import access_logs +import re +from pyspark import SparkContext -not200 = access_logs.filter(lambda log: log.response_code != 200) +sc = SparkContext.getOrCreate() -endpointCountPairTuple = not200.map(lambda log: (log.endpoint, 1)) +LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)') -endpointSum = endpointCountPairTuple.reduceByKey(lambda a, b : a + b) -topTenErrURLs = endpointSum.takeOrdered(10, lambda s: -1 * s[1]) +def parse_log_line(line): + match = LOG_PATTERN.match(line) + if not match: + return None -print('Top Ten failed URLs: %s' % topTenErrURLs) + content_size_str = match.group(9) + content_size = int(content_size_str) if content_size_str.isdigit() else 0 + + return { + 'ip': match.group(1), + 'user_identity': match.group(2), + 'user_id': match.group(3), + 'timestamp': match.group(4), + 'method': match.group(5), + 'endpoint': match.group(6), + 'protocol': match.group(7), + 'status_code': int(match.group(8)), + 'content_size': content_size + } + + +logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" +raw_logs = sc.textFile(logFile) +access_logs = raw_logs.map(parse_log_line).filter(lambda x: x is not None).cache() + +# 加入一个保护,防止 access_logs 空时报错 +if access_logs.isEmpty(): + print("日志文件为空或解析失败") +else: + endpoint_counts = (access_logs + .map(lambda log: (log['endpoint'], 1)) + .reduceByKey(lambda a, b: a + b) + .sortBy(lambda x: -x[1]) + .take(10)) + + print("Top 10 most visited endpoints:") + for endpoint, count in endpoint_counts: + print(f"{endpoint}: {count} hits") + +sc.stop()