diff --git a/1-1.py b/1-1.py index f08f1af..b27440b 100644 --- a/1-1.py +++ b/1-1.py @@ -1,11 +1,48 @@ -from log_analysis_step2 import access_logs +import re +from pyspark import SparkContext -not200 = access_logs.filter(lambda log: log.response_code != 200) +sc = SparkContext.getOrCreate() -endpointCountPairTuple = not200.map(lambda log: (log.endpoint, 1)) +LOG_PATTERN = re.compile(r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)') -endpointSum = endpointCountPairTuple.reduceByKey(lambda a, b : a + b) -topTenErrURLs = endpointSum.takeOrdered(10, lambda s: -1 * s[1]) +def parse_log_line(line): + match = LOG_PATTERN.match(line) + if not match: + return None -print('Top Ten failed URLs: %s' % topTenErrURLs) + content_size_str = match.group(9) + content_size = int(content_size_str) if content_size_str.isdigit() else 0 + + return { + 'ip': match.group(1), + 'user_identity': match.group(2), + 'user_id': match.group(3), + 'timestamp': match.group(4), + 'method': match.group(5), + 'endpoint': match.group(6), + 'protocol': match.group(7), + 'status_code': int(match.group(8)), + 'content_size': content_size + } + + +logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" +raw_logs = sc.textFile(logFile) +access_logs = raw_logs.map(parse_log_line).filter(lambda x: x is not None).cache() + +# 加入一个保护,防止 access_logs 空时报错 +if access_logs.isEmpty(): + print("日志文件为空或解析失败") +else: + endpoint_counts = (access_logs + .map(lambda log: (log['endpoint'], 1)) + .reduceByKey(lambda a, b: a + b) + .sortBy(lambda x: -x[1]) + .take(10)) + + print("Top 10 most visited endpoints:") + for endpoint, count in endpoint_counts: + print(f"{endpoint}: {count} hits") + +sc.stop()