import re import datetime from pyspark.sql import Row from pyspark import SparkContext from test_helper import Test # 定义月份映射表 month_map = { 'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7, 'Aug':8, 'Sep':9, 'Oct':10, 'Nov':11, 'Dec':12 } # 解析Apache时间格式: 将Apache时间字符串转换为Python datetime对象 def parse_apache_time(s): return datetime.datetime( int(s[7:11]), # 年 month_map[s[3:6]], # 月 int(s[0:2]), # 日 int(s[12:14]), # 时 int(s[15:17]), # 分 int(s[18:20]) # 秒 ) # 正则表达式模式 APACHE_ACCESS_LOG_PATTERN = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)' # 解析日志行, 返回Row对象或原始行 def parseApacheLogLine(logline): match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) if match is None: return (logline, 0) # 解析失败 size_field = match.group(9) size = 0 if size_field == '-' else int(size_field) return ( Row( host=match.group(1), client_identd=match.group(2), user_id=match.group(3), date_time=parse_apache_time(match.group(4)), method=match.group(5), endpoint=match.group(6), protocol=match.group(7), response_code=int(match.group(8)), content_size=size ), 1 # 解析成功 ) # 初始化Spark上下文 sc = SparkContext.getOrCreate() # 定义日志文件路径 logFile = "hdfs://master:9000/user/root/apache.access.log.PROJECT" # 主函数:解析日志并分离成功/失败行 def parseLogs(): parsed_logs = ( sc.textFile(logFile) .map(parseApacheLogLine) .cache() ) access_logs = ( parsed_logs.filter(lambda s: s[1] == 1) .map(lambda s: s[0]) .cache() ) failed_logs = ( parsed_logs.filter(lambda s: s[1] == 0) .map(lambda s: s[0]) ) failed_logs_count = failed_logs.count() if failed_logs_count > 0: print('Number of invalid loglines: {0}'.format(failed_logs_count)) for line in failed_logs.take(20): print('Invalid logline: {0}'.format(line)) print('Read {0} lines, successfully parsed {1} lines, failed to parse {2} lines'.format( parsed_logs.count(), access_logs.count(), failed_logs.count() )) return parsed_logs, access_logs, failed_logs if __name__ == "__main__": parsed_logs, access_logs, failed_logs = parseLogs() Test.assertEquals(failed_logs.count(), 0, 'incorrect failed_logs.count()') Test.assertEquals(parsed_logs.count(), 3092, 'incorrect parsed_logs.count()') Test.assertEquals(access_logs.count(), parsed_logs.count(), 'incorrect access_logs.count()')