import re import matplotlib.pyplot as plt from pyspark import SparkContext sc = SparkContext.getOrCreate() logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" raw_logs = sc.textFile(logFile) LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)') # 解析日期和主机 def parse_day_ip(line): match = LOG_PATTERN.match(line) if not match: return None ip = match.group(1) full_time = match.group(4) day = full_time.split('/')[0] # "10" return (day, ip) # 1️⃣ 每天总请求数 def parse_day(line): match = LOG_PATTERN.match(line) if not match: return None full_time = match.group(4) day = full_time.split('/')[0] return (day, 1) # 得到每天总请求数 dailyRequests = raw_logs.map(parse_day).filter(lambda x: x is not None) \ .reduceByKey(lambda a, b: a + b) # 得到每天不同主机数(如 1.3 中的逻辑) day_ip_pairs = raw_logs.map(parse_day_ip).filter(lambda x: x is not None) dailyUniqueHosts = day_ip_pairs.distinct() \ .map(lambda x: (x[0], 1)) \ .reduceByKey(lambda a, b: a + b) # 2️⃣ 合并两个 RDD,并计算平均值(用整数除法) dailyReqJoinHost = dailyRequests.join(dailyUniqueHosts) avgDailyReqPerHost = dailyReqJoinHost.map( lambda x: (x[0], x[1][0] // x[1][1]) # (day, total_requests // unique_hosts) ).sortByKey().cache() # ⚠️ 缓存结果供后续使用 # 收集数据并准备绘图 daysWithAvg = [] avgs = [] for day, avg in avgDailyReqPerHost.collect(): daysWithAvg.append(day) avgs.append(avg) # 绘制折线图 plt.figure(figsize=(10, 6)) plt.plot(daysWithAvg, avgs, marker='o', linestyle='-', color='b', label='Average Requests per Host') plt.xlabel('Day') plt.ylabel('Average Requests per Host') plt.title('Average Daily Requests per Host') plt.xticks(rotation=45) plt.tight_layout() plt.legend() plt.show() sc.stop()