diff --git a/2-9.py b/2-9.py index 298af4b..47d3c0b 100644 --- a/2-9.py +++ b/2-9.py @@ -1,74 +1,104 @@ import re -from pyspark import SparkContext +import datetime +from pyspark.sql import SparkSession, Row +from pyspark.sql import functions as F import matplotlib.pyplot as plt -# 初始化 SparkContext -sc = SparkContext.getOrCreate() +# 定义日志解析的正则表达式 +APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s(\S)" (\d{3}) (\S+)' -# 日志匹配的正则表达式 -LOG_PATTERN = re.compile( - r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)' -) +# 将Apache日志中的时间字符串解析为datetime对象 +month_map = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, + 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} -# 解析日志的函数 -def parse_log_line(line): - match = LOG_PATTERN.match(line) - if not match: - return None - content_size_str = match.group(9) - content_size = int(content_size_str) if content_size_str.isdigit() else 0 +def parse_apache_time(s): + """Convert Apache time format into a Python datetime object""" + return datetime.datetime(int(s[7:11]), + month_map[s[3:6]], + int(s[0:2]), + int(s[12:14]), + int(s[15:17]), + int(s[18:20])) - return { - 'ip': match.group(1), - 'user_identity': match.group(2), - 'user_id': match.group(3), - 'timestamp': match.group(4), - 'method': match.group(5), - 'endpoint': match.group(6), - 'protocol': match.group(7), - 'status_code': int(match.group(8)), - 'content_size': content_size - } -# 提取小时 -def extract_hour(log): - timestamp = log['timestamp'] - hour = timestamp.split(":")[1] # 从时间戳中提取小时 - return hour +def parseApacheLogLine(logline): + """Parse a line in the Apache Common Log format""" + match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) + if match is None: + return (logline, 0) + size_field = match.group(9) + size = int(size_field) if size_field != '-' else 0 + return (Row( + host=match.group(1), + client_identd=match.group(2), + user_id=match.group(3), + date_time=parse_apache_time(match.group(4)), + method=match.group(5), + endpoint=match.group(6), + protocol=match.group(7), + response_code=int(match.group(8)), + content_size=size + ), 1) -if __name__ == "__main__": - # 加载日志文件 - logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" - raw_logs = sc.textFile(logFile) - # 解析并过滤有效日志 - access_logs = raw_logs.map(parse_log_line).filter(lambda x: x is not None).cache() +def main(): + # 创建SparkSession + spark = SparkSession.builder \ + .appName("Apache Log Analysis") \ + .getOrCreate() - # 只保留 404 响应代码的日志 - bad_records = access_logs.filter(lambda log: log['status_code'] == 404).cache() + # 读取日志文件 + logFile = 'apache.access.log.PROJECT' # 替换为您的日志文件路径 + rdd = spark.sparkContext.textFile(logFile) - # 提取每小时的 404 错误记录 - hourly_404_counts = bad_records.map(lambda log: (extract_hour(log), 1)) \ - .reduceByKey(lambda a, b: a + b) \ - .sortByKey() # 按小时排序 + # 解析日志行 + parsed_logs = rdd.map(parseApacheLogLine) - # 将结果转换为列表 - hourly_404_counts_list = hourly_404_counts.collect() + # 过滤出有效日志行 + access_logs = parsed_logs.filter(lambda s: s[1] == 1).map(lambda s: s[0]).cache() - # 提取小时和对应的 404 次数 - hours = [hour for hour, count in hourly_404_counts_list] - counts = [count for hour, count in hourly_404_counts_list] + # 过滤出404响应代码的日志 + access_logs_404 = access_logs.filter(access_logs.response_code == 404) - # 使用 matplotlib 绘制折线图 + # 提取小时信息 + access_logs_with_hour = access_logs_404.withColumn("hour", F.hour(access_logs_404.date_time)) + + # 计算每小时的404响应代码数量 + hourly_404_counts = access_logs_with_hour.groupBy("hour").count().orderBy("hour") + + # 收集数据并准备绘图 + hourly_counts = hourly_404_counts.collect() + + # 提取小时和计数 + hours = [row["hour"] for row in hourly_counts] + counts = [row["count"] for row in hourly_counts] + + # 使用Matplotlib绘制折线图 plt.figure(figsize=(10, 6)) - plt.plot(hours, counts, marker='o', color='b', linestyle='-', linewidth=2, markersize=6) - plt.title('每小时404响应代码数量') - plt.xlabel('小时') - plt.ylabel('404响应次数') - plt.xticks(rotation=45) # 将小时标签旋转 45 度 - plt.tight_layout() + plt.plot(hours, counts, marker='o', linestyle='-', color='b', label='404 Responses') + plt.title("Hourly 404 Response Code Counts") + plt.xlabel("Hour of the Day") + plt.ylabel("Count of 404 Responses") + plt.xticks(range(24)) # 显示24小时 + plt.grid(True) + plt.legend() plt.show() - # 停止 Spark - sc.stop() + # 使用Matplotlib绘制条形图 + plt.figure(figsize=(10, 6)) + plt.bar(hours, counts, color='orange', label='404 Responses') + plt.title("Hourly 404 Response Code Counts") + plt.xlabel("Hour of the Day") + plt.ylabel("Count of 404 Responses") + plt.xticks(range(24)) # 显示24小时 + plt.grid(True) + plt.legend() + plt.show() + + # 结束SparkSession + spark.stop() + + +if __name__ == "__main__": + main()