47 lines
1.2 KiB
Python
47 lines
1.2 KiB
Python
import re
|
||
from pyspark import SparkContext
|
||
|
||
sc = SparkContext.getOrCreate()
|
||
|
||
LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)')
|
||
|
||
|
||
def parse_log_line(line):
|
||
match = LOG_PATTERN.match(line)
|
||
if not match:
|
||
return None
|
||
|
||
ip = match.group(1)
|
||
# 提取日期字符串的“日”部分,格式如:10/Oct/2000:13:55:36 -0700
|
||
full_time = match.group(4)
|
||
day = full_time.split('/')[0] # "10"
|
||
|
||
return (day, ip)
|
||
|
||
|
||
logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT"
|
||
raw_logs = sc.textFile(logFile)
|
||
|
||
# 提取日期与 IP,过滤掉解析失败的行
|
||
date_ip_pairs = raw_logs.map(parse_log_line).filter(lambda x: x is not None)
|
||
|
||
# 去重 (day, ip) 对
|
||
unique_daily_hosts = date_ip_pairs.distinct()
|
||
|
||
# 按天统计唯一主机数
|
||
daily_host_counts = unique_daily_hosts \
|
||
.map(lambda pair: (pair[0], 1)) \
|
||
.reduceByKey(lambda a, b: a + b)
|
||
|
||
# 缓存 dailyHosts 以便后续使用
|
||
dailyHosts = daily_host_counts.cache()
|
||
|
||
# 按日期升序排序并收集结果
|
||
sorted_daily_hosts = dailyHosts.sortByKey().collect()
|
||
|
||
# 打印结果
|
||
for day, count in sorted_daily_hosts:
|
||
print("Day {0}: {1} unique hosts".format(day, count))
|
||
|
||
sc.stop()
|