web-analyze/1-5.py
fly6516 673d4ab1a3 feat(1-5): 计算每天平均每个主机的请求数
- 读取 HDFS 中的日志文件
- 解析日期和主机信息
- 计算每天的总请求数和唯一主机数
- 合并结果并计算平均值
- 输出每天每个主机的平均请求数
2025-04-14 02:27:07 +08:00

52 lines
1.5 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT"
raw_logs = sc.textFile(logFile)
LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)')
# 解析日期和主机
def parse_day_ip(line):
match = LOG_PATTERN.match(line)
if not match:
return None
ip = match.group(1)
full_time = match.group(4)
day = full_time.split('/')[0] # "10"
return (day, ip)
# 1⃣ 每天总请求数
def parse_day(line):
match = LOG_PATTERN.match(line)
if not match:
return None
full_time = match.group(4)
day = full_time.split('/')[0]
return (day, 1)
# 得到每天总请求数
dailyRequests = raw_logs.map(parse_day).filter(lambda x: x is not None) \
.reduceByKey(lambda a, b: a + b)
# 得到每天不同主机数(如 1.3 中的逻辑)
day_ip_pairs = raw_logs.map(parse_day_ip).filter(lambda x: x is not None)
dailyUniqueHosts = day_ip_pairs.distinct() \
.map(lambda x: (x[0], 1)) \
.reduceByKey(lambda a, b: a + b)
# 2⃣ 合并两个 RDD并计算平均值用整数除法
dailyReqJoinHost = dailyRequests.join(dailyUniqueHosts)
avgDailyReqPerHost = dailyReqJoinHost.map(
lambda x: (x[0], x[1][0] // x[1][1]) # (day, total_requests // unique_hosts)
).sortByKey().cache() # ⚠️ 缓存结果供后续使用
# 输出看看
for day, avg in avgDailyReqPerHost.collect():
print("Day {}: 平均每个主机请求 {}".format(day, avg))
sc.stop()