web-analyze/2-7.py
fly6516 d67642da65 refactor(2-7.py): 移除不必要的代码并简化获取最多 404 错误记录的五天的逻辑
- 删除了不必要的输出前五天及其 404 错误记录的代码
- 删除了不必要的停止 Spark 代码
- 简化了获取最多 404 错误记录的五天的逻辑
2025-04-14 03:54:42 +08:00

61 lines
1.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from pyspark import SparkContext
# 初始化 SparkContext
sc = SparkContext.getOrCreate()
# 日志匹配的正则表达式
LOG_PATTERN = re.compile(
r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)'
)
# 解析日志的函数
def parse_log_line(line):
match = LOG_PATTERN.match(line)
if not match:
return None
content_size_str = match.group(9)
content_size = int(content_size_str) if content_size_str.isdigit() else 0
return {
'ip': match.group(1),
'user_identity': match.group(2),
'user_id': match.group(3),
'timestamp': match.group(4),
'method': match.group(5),
'endpoint': match.group(6),
'protocol': match.group(7),
'status_code': int(match.group(8)),
'content_size': content_size
}
def extract_day(log):
# 时间格式为10/Oct/2000:13:55:36 -0700
full_date = log['timestamp']
day = full_date.split('/')[0] # 只提取日
return day
if __name__ == "__main__":
# 加载日志文件
logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT"
raw_logs = sc.textFile(logFile)
# 解析并过滤有效日志
access_logs = raw_logs.map(parse_log_line).filter(lambda x: x is not None).cache()
# 过滤 404 状态码的日志
error_404_logs = access_logs.filter(lambda log: log['status_code'] == 404).cache()
# 每日 404 次数统计
errDateSorted = (
error_404_logs
.map(lambda log: (extract_day(log), 1))
.reduceByKey(lambda a, b: a + b)
.sortBy(lambda x: x[1], ascending=False) # 按次数降序排序
.cache()
)
# 获取最多的五天
top_5_days = errDateSorted