web-analyze/1-3.py

47 lines
1.2 KiB
Python
Raw Permalink Normal View History

import re
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)')
def parse_log_line(line):
match = LOG_PATTERN.match(line)
if not match:
return None
ip = match.group(1)
# 提取日期字符串的“日”部分格式如10/Oct/2000:13:55:36 -0700
full_time = match.group(4)
day = full_time.split('/')[0] # "10"
return (day, ip)
logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT"
raw_logs = sc.textFile(logFile)
# 提取日期与 IP过滤掉解析失败的行
date_ip_pairs = raw_logs.map(parse_log_line).filter(lambda x: x is not None)
# 去重 (day, ip) 对
unique_daily_hosts = date_ip_pairs.distinct()
# 按天统计唯一主机数
daily_host_counts = unique_daily_hosts \
.map(lambda pair: (pair[0], 1)) \
.reduceByKey(lambda a, b: a + b)
# 缓存 dailyHosts 以便后续使用
dailyHosts = daily_host_counts.cache()
# 按日期升序排序并收集结果
sorted_daily_hosts = dailyHosts.sortByKey().collect()
# 打印结果
for day, count in sorted_daily_hosts:
print("Day {0}: {1} unique hosts".format(day, count))
sc.stop()