web-analyze/1-1.py
fly6516 28d60018af fix(1-1.py):优化日志解析和空 RDD处理
- 重构日志解析逻辑,提高代码可读性
- 改进空 RDD检查方法,使用 count() 替代 isEmpty()
- 优化输出格式,使用格式化字符串替代 f-string
2025-04-14 01:51:33 +08:00

52 lines
1.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)')
def parse_log_line(line):
match = LOG_PATTERN.match(line)
if not match:
return None
content_size_str = match.group(9)
content_size = int(content_size_str) if content_size_str.isdigit() else 0
return {
'ip': match.group(1),
'user_identity': match.group(2),
'user_id': match.group(3),
'timestamp': match.group(4),
'method': match.group(5),
'endpoint': match.group(6),
'protocol': match.group(7),
'status_code': int(match.group(8)),
'content_size': content_size
}
logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT"
raw_logs = sc.textFile(logFile)
parsed_logs = raw_logs.map(parse_log_line).filter(lambda x: x is not None)
access_logs = parsed_logs.cache()
# 判断是否为空 RDD兼容性写法count() == 0
if access_logs.count() == 0:
print("日志文件为空或格式不匹配")
else:
endpoint_counts = (access_logs
.map(lambda log: (log['endpoint'], 1))
.reduceByKey(lambda a, b: a + b)
.sortBy(lambda x: -x[1])
.take(10))
print("Top 10 most visited endpoints:")
for item in endpoint_counts:
endpoint, count = item[0], item[1]
print("{0}: {1} hits".format(endpoint, count))
sc.stop()