web-analyze/2-8.py
fly6516 d0d58e7d4b feat: 添加 2-8.py 文件并实现每小时 404 错误统计
- 新增 2-8.py 文件,实现从日志文件中解析和统计每小时的 404 错误数量
- 使用 Spark 进行日志处理和分析
- 添加日志解析函数和提取小时函数
- 实现从 HDFS 读取日志数据并进行过滤和聚合
- 最后输出每小时的 404 错误数量
2025-04-14 03:57:15 +08:00

67 lines
1.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from pyspark import SparkContext
# 初始化 SparkContext
sc = SparkContext.getOrCreate()
# 日志匹配的正则表达式
LOG_PATTERN = re.compile(
r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)'
)
# 解析日志的函数
def parse_log_line(line):
match = LOG_PATTERN.match(line)
if not match:
return None
content_size_str = match.group(9)
content_size = int(content_size_str) if content_size_str.isdigit() else 0
return {
'ip': match.group(1),
'user_identity': match.group(2),
'user_id': match.group(3),
'timestamp': match.group(4),
'method': match.group(5),
'endpoint': match.group(6),
'protocol': match.group(7),
'status_code': int(match.group(8)),
'content_size': content_size
}
def extract_hour(log):
# 时间格式为10/Oct/2000:13:55:36 -0700
full_date = log['timestamp']
hour = full_date.split(':')[1] # 提取小时
return hour
if __name__ == "__main__":
# 加载日志文件
logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT"
raw_logs = sc.textFile(logFile)
# 解析并过滤有效日志
access_logs = raw_logs.map(parse_log_line).filter(lambda x: x is not None).cache()
# 过滤 404 状态码的日志
badRecords = access_logs.filter(lambda log: log['status_code'] == 404).cache()
# 每小时 404 错误次数统计
hourRecorded = (
badRecords
.map(lambda log: (extract_hour(log), 1))
.reduceByKey(lambda a, b: a + b)
.sortByKey() # 按小时顺序排序
.cache()
)
# 获取结果并打印
result = hourRecorded.collect()
print("每小时的 404 错误数量:")
for hour, count in result:
print("小时 {}: {} 次 404 错误".format(hour, count))
# 停止 Spark
sc.stop()