From 673d4ab1a3f839bd3988a85a3d23a3d6bd894a40 Mon Sep 17 00:00:00 2001 From: fly6516 Date: Mon, 14 Apr 2025 02:27:07 +0800 Subject: [PATCH] =?UTF-8?q?feat(1-5):=20=E8=AE=A1=E7=AE=97=E6=AF=8F?= =?UTF-8?q?=E5=A4=A9=E5=B9=B3=E5=9D=87=E6=AF=8F=E4=B8=AA=E4=B8=BB=E6=9C=BA?= =?UTF-8?q?=E7=9A=84=E8=AF=B7=E6=B1=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 读取 HDFS 中的日志文件 - 解析日期和主机信息 - 计算每天的总请求数和唯一主机数 - 合并结果并计算平均值 - 输出每天每个主机的平均请求数 --- 1-5.py | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 1-5.py diff --git a/1-5.py b/1-5.py new file mode 100644 index 0000000..5895373 --- /dev/null +++ b/1-5.py @@ -0,0 +1,51 @@ +import re +from pyspark import SparkContext + +sc = SparkContext.getOrCreate() + +logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" +raw_logs = sc.textFile(logFile) + +LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)') + +# 解析日期和主机 +def parse_day_ip(line): + match = LOG_PATTERN.match(line) + if not match: + return None + ip = match.group(1) + full_time = match.group(4) + day = full_time.split('/')[0] # "10" + return (day, ip) + +# 1️⃣ 每天总请求数 +def parse_day(line): + match = LOG_PATTERN.match(line) + if not match: + return None + full_time = match.group(4) + day = full_time.split('/')[0] + return (day, 1) + +# 得到每天总请求数 +dailyRequests = raw_logs.map(parse_day).filter(lambda x: x is not None) \ + .reduceByKey(lambda a, b: a + b) + +# 得到每天不同主机数(如 1.3 中的逻辑) +day_ip_pairs = raw_logs.map(parse_day_ip).filter(lambda x: x is not None) +dailyUniqueHosts = day_ip_pairs.distinct() \ + .map(lambda x: (x[0], 1)) \ + .reduceByKey(lambda a, b: a + b) + +# 2️⃣ 合并两个 RDD,并计算平均值(用整数除法) +dailyReqJoinHost = dailyRequests.join(dailyUniqueHosts) + +avgDailyReqPerHost = dailyReqJoinHost.map( + lambda x: (x[0], x[1][0] // x[1][1]) # (day, total_requests // unique_hosts) +).sortByKey().cache() # ⚠️ 缓存结果供后续使用 + +# 输出看看 +for day, avg in avgDailyReqPerHost.collect(): + print("Day {}: 平均每个主机请求 {} 次".format(day, avg)) + +sc.stop()