feat(1-5): 计算每天平均每个主机的请求数

- 读取 HDFS 中的日志文件
- 解析日期和主机信息
- 计算每天的总请求数和唯一主机数
- 合并结果并计算平均值
- 输出每天每个主机的平均请求数
This commit is contained in:
fly6516 2025-04-14 02:27:07 +08:00
parent 60c3a1e4ca
commit 673d4ab1a3

51
1-5.py Normal file
View File

@ -0,0 +1,51 @@
import re
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT"
raw_logs = sc.textFile(logFile)
LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)')
# 解析日期和主机
def parse_day_ip(line):
match = LOG_PATTERN.match(line)
if not match:
return None
ip = match.group(1)
full_time = match.group(4)
day = full_time.split('/')[0] # "10"
return (day, ip)
# 1⃣ 每天总请求数
def parse_day(line):
match = LOG_PATTERN.match(line)
if not match:
return None
full_time = match.group(4)
day = full_time.split('/')[0]
return (day, 1)
# 得到每天总请求数
dailyRequests = raw_logs.map(parse_day).filter(lambda x: x is not None) \
.reduceByKey(lambda a, b: a + b)
# 得到每天不同主机数(如 1.3 中的逻辑)
day_ip_pairs = raw_logs.map(parse_day_ip).filter(lambda x: x is not None)
dailyUniqueHosts = day_ip_pairs.distinct() \
.map(lambda x: (x[0], 1)) \
.reduceByKey(lambda a, b: a + b)
# 2⃣ 合并两个 RDD并计算平均值用整数除法
dailyReqJoinHost = dailyRequests.join(dailyUniqueHosts)
avgDailyReqPerHost = dailyReqJoinHost.map(
lambda x: (x[0], x[1][0] // x[1][1]) # (day, total_requests // unique_hosts)
).sortByKey().cache() # ⚠️ 缓存结果供后续使用
# 输出看看
for day, avg in avgDailyReqPerHost.collect():
print("Day {}: 平均每个主机请求 {}".format(day, avg))
sc.stop()