import re from pyspark import SparkContext sc = SparkContext.getOrCreate() LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)') def parse_log_line(line): match = LOG_PATTERN.match(line) if not match: return None ip = match.group(1) # 提取日期字符串的“日”部分,格式如:10/Oct/2000:13:55:36 -0700 full_time = match.group(4) day = full_time.split('/')[0] # "10" return (day, ip) logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" raw_logs = sc.textFile(logFile) # 提取日期与 IP,过滤掉解析失败的行 date_ip_pairs = raw_logs.map(parse_log_line).filter(lambda x: x is not None) # 去重 (day, ip) 对 unique_daily_hosts = date_ip_pairs.distinct() # 按天统计唯一主机数 daily_host_counts = unique_daily_hosts \ .map(lambda pair: (pair[0], 1)) \ .reduceByKey(lambda a, b: a + b) # 缓存 dailyHosts 以便后续使用 dailyHosts = daily_host_counts.cache() # 按日期升序排序并收集结果 sorted_daily_hosts = dailyHosts.sortByKey().collect() # 打印结果 for day, count in sorted_daily_hosts: print("Day {0}: {1} unique hosts".format(day, count)) sc.stop()