diff --git a/pretest1.py b/pretest1.py index e69de29..f3d6824 100644 --- a/pretest1.py +++ b/pretest1.py @@ -0,0 +1,86 @@ +import re +import datetime +from pyspark.sql import Row +from pyspark import SparkContext +from test_helper import Test + +# 定义月份映射表 +month_map = { + 'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, + 'Jul':7, 'Aug':8, 'Sep':9, 'Oct':10, 'Nov':11, 'Dec':12 +} + +# 解析Apache时间格式: 将Apache时间字符串转换为Python datetime对象 +def parse_apache_time(s): + return datetime.datetime( + int(s[7:11]), # 年 + month_map[s[3:6]], # 月 + int(s[0:2]), # 日 + int(s[12:14]), # 时 + int(s[15:17]), # 分 + int(s[18:20]) # 秒 + ) + +# 正则表达式模式 +APACHE_ACCESS_LOG_PATTERN = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)' + +# 解析日志行, 返回Row对象或原始行 +def parseApacheLogLine(logline): + match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) + if match is None: + return (logline, 0) # 解析失败 + size_field = match.group(9) + size = 0 if size_field == '-' else int(size_field) + return ( + Row( + host=match.group(1), + client_identd=match.group(2), + user_id=match.group(3), + date_time=parse_apache_time(match.group(4)), + method=match.group(5), + endpoint=match.group(6), + protocol=match.group(7), + response_code=int(match.group(8)), + content_size=size + ), 1 # 解析成功 + ) + +# 初始化Spark上下文 +sc = SparkContext.getOrCreate() + +# 定义日志文件路径 +logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" + +# 主函数:解析日志并分离成功/失败行 +def parseLogs(): + parsed_logs = ( + sc.textFile(logFile) + .map(parseApacheLogLine) + .cache() + ) + access_logs = ( + parsed_logs.filter(lambda s: s[1] == 1) + .map(lambda s: s[0]) + .cache() + ) + failed_logs = ( + parsed_logs.filter(lambda s: s[1] == 0) + .map(lambda s: s[0]) + ) + failed_logs_count = failed_logs.count() + if failed_logs_count > 0: + print('Number of invalid loglines: {0}'.format(failed_logs_count)) + for line in failed_logs.take(20): + print('Invalid logline: {0}'.format(line)) + print('Read {0} lines, successfully parsed {1} lines, failed to parse {2} lines'.format( + parsed_logs.count(), + access_logs.count(), + failed_logs.count() + )) + return parsed_logs, access_logs, failed_logs + +if __name__ == "__main__": + parsed_logs, access_logs, failed_logs = parseLogs() + Test.assertEquals(failed_logs.count(), 0, 'incorrect failed_logs.count()') + Test.assertEquals(parsed_logs.count(), 3092, 'incorrect parsed_logs.count()') +Test.assertEquals(access_logs.count(), parsed_logs.count(), 'incorrect access_logs.count()')