diff --git a/1-3.py b/1-3.py new file mode 100644 index 0000000..e239309 --- /dev/null +++ b/1-3.py @@ -0,0 +1,46 @@ +import re +from pyspark import SparkContext + +sc = SparkContext.getOrCreate() + +LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)') + + +def parse_log_line(line): + match = LOG_PATTERN.match(line) + if not match: + return None + + ip = match.group(1) + # 提取日期字符串的“日”部分,格式如:10/Oct/2000:13:55:36 -0700 + full_time = match.group(4) + day = full_time.split('/')[0] # "10" + + return (day, ip) + + +logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" +raw_logs = sc.textFile(logFile) + +# 提取日期与 IP,过滤掉解析失败的行 +date_ip_pairs = raw_logs.map(parse_log_line).filter(lambda x: x is not None) + +# 去重 (day, ip) 对 +unique_daily_hosts = date_ip_pairs.distinct() + +# 按天统计唯一主机数 +daily_host_counts = unique_daily_hosts \ + .map(lambda pair: (pair[0], 1)) \ + .reduceByKey(lambda a, b: a + b) + +# 缓存 dailyHosts 以便后续使用 +dailyHosts = daily_host_counts.cache() + +# 按日期升序排序并收集结果 +sorted_daily_hosts = dailyHosts.sortByKey().collect() + +# 打印结果 +for day, count in sorted_daily_hosts: + print("Day {0}: {1} unique hosts".format(day, count)) + +sc.stop()