时间:2021-07-01 10:21:17 帮助过:3人阅读
先说一下我想实现这个功能的驱动力(痛点)吧:
我们有不少站点,前边有CDN,原站前面是F5,走到源站的访问总量日均PV约5000w。下面是我们经常面临一些问题:
再说下原理:
比较简单,就是利用python的re模块通过正则表达式对日志进行分析处理,取得uri
、args
、时间当前
、状态码
、响应大小
、响应时间
、用户IP
、CDN ip
、server name
等信息存储进数据库。
当然前提规范也是必须的:
我的nginx日志格式如下:
log_format access ‘$remote_addr - [$time_local] "$request" ‘
‘$status $body_bytes_sent $request_time "$http_referer" ‘
‘"$http_user_agent" - $http_x_forwarded_for‘;
日志分析原理:
通过Python的re模块,按照应用服务器的日志格式编写正则,例如按照我的日志格式,写出的正则如下(编写正则时,先不要换行,确保空格或引号等与日志格式一致,最后考虑美观可以折行)
log_pattern = r‘^(?P<remote_addr>.*?) - \[(?P<time_local>.*?)\] "(?P<request>.*?)"‘ r‘ (?P<status>.*?) (?P<body_bytes_sent>.*?) (?P<request_time>.*?)‘ r‘ "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)" - (?P<http_x_forwarded_for>.*)$‘
log_pattern_obj = re.compile(log_pattern)
用以上正则来整体匹配一行日志记录,然后各个部分可以通过log_pattern_obj.search(log).group(‘remote_addr‘)
、log_pattern_obj.search(log).group(‘body_bytes_sent‘)
等形式来访问
对于其他格式的nginx日志或者Apache日志,按照如上原则,并对数据库结构做相应调整,都可以轻松的使用该脚本分析处理。
原理虽简单但实现起来却发现有好多坑,主要是按照上述的日志格式(靠空格或双引号来分割各段)主要问题是面对各种不规范的记录时(原因不一而足,而且也是样式繁多),如何正确的分割及处理日志的各字段,这也是我用re模块而不是简单的split()函数的原因。代码里对一些“可以容忍”的异常记录通过一些判断逻辑予以处理;对于“无法容忍”的异常记录则返回空字符串并将日志记录于文件。
其实对于上述的这些不规范的请求,最好的办法是在nginx中定义日志格式时,用一个特殊字符作为分隔符,例如“|”。这样都不用Python的re模块,直接字符串分割就能正确的获取到各段。
接下来看看使用效果:
先看一行数据库里的记录
*************************** 9. row ***************************
id: 9
server: web6
uri_abs: /chapter/?/?.json
uri_abs_crc32: 443227294
args_abs: channel=ios&version=?
args_abs_crc32: 2972340533
time_local: 2017-02-22 23:59:01
response_code: 200
bytes_sent: 218
request_time: 0.028
user_ip: 210.78.141.185
cdn_ip: 27.221.112.163
request_method: GET
uri: /chapter/14278/28275.json
args: channel=ios&version=2.0.6
referer:
其中uri_abs
和args_abs
是对uri和args进行抽象化(抽象出一个模式出来)处理之后的结果。对uri中个段和args中的value部分除了完全由[a-zA-Z-_]+组成的部分之外的部分都用“?”做替换。uri_abs_crc32
和args_abs_crc32
两列是对抽象化结果进行crc32计算,这两列单纯只是为了在MySQL中对uri或args进行分类统计汇总时得到更好的性能。
现在还没有完成统一分析的入口脚本,所以还是以sql语句的形式来查询(对用户的sql功底有要求,不友好待改善)
select count(*) from www where time_local>=‘2016-12-09 00:00:00‘ and time_local<=‘2016-12-09 23:59:59‘
mysql> select count(*) from www where uri_abs_crc32=2043925204 and time_local > ‘2016-11-23 10:00:00‘ and time_local <‘2016-11-23 23:59:59‘;
mysql> select uri_abs,count(*) as num,sum(request_time) as total_time,sum(request_time)/count(*) as average_time from www group by uri_abs_crc32 order by num desc limit 5;
+------------------------------------------+---------+------------+--------------+
| uri_abs | num | total_time | average_time |
+------------------------------------------+---------+------------+--------------+
| /comicsum/comicshot.php | 2700716 | 1348.941 | 0.0004995 |
| /category/?.html | 284788 | 244809.877 | 0.8596215 |
| / | 72429 | 1172.113 | 0.0161829 |
| /static/hits/?.json | 27451 | 7.658 | 0.0002790 |
| /dynamic/o_search/searchKeyword | 26230 | 3757.661 | 0.1432581 |
+------------------------------------------+---------+------------+--------------+
10 rows in set (40.09 sec)
mysql> select uri_abs,count(*) as num,sum(bytes_sent) as total_bytes,sum(bytes_sent)/count(*) as average_bytes from www group by uri_abs_crc32 order by num desc,average_bytes desc limit 10;
+------------------------------------------+---------+-------------+---------------+
| uri_abs | num | total_bytes | average_bytes |
+------------------------------------------+---------+-------------+---------------+
| /comicsum/comicshot.php | 2700716 | 72889752 | 26.9890 |
| /category/?.html | 284788 | 3232744794 | 11351.4080 |
| / | 72429 | 1904692759 | 26297.3776 |
| /static/hits/?.json | 27451 | 5160560 | 187.9917 |
| /dynamic/o_search/searchKeyword | 26230 | 3639846 | 138.7665 |
+------------------------------------------+---------+-------------+---------------+
以上只列举了几个例子,基本上除了UA部分(代码中已有捕捉,但是笔者用不到),其他的信息都以包含到表中。因此几乎可以对网站流量
,负载
,响应时间
等方面的任何疑问给出数据上的支持。
Python外部包依赖:pymysql
MySQL(笔者5.6版本)将innodb_file_format
设置为Barracuda
(这个设置并不对其他库表产生影响,即使生产数据库设置也无妨),以便在建表语句中可以通过ROW_FORMAT=COMPRESSED
将innodb表这只为压缩模式,笔者实验开启压缩模式后,数据文件大小将近减小50%。
接下来请看代码:
#!/bin/env python3
# coding:utf-8
"""
ljk 20161116(update 20170217)
This script should be put in crontab in every web server.Execute every n minutes.
Collect nginx access log, process it and insert the result into mysql.
"""
import os
import re
import subprocess
import time
import warnings
import pymysql
from sys import argv, exit
from socket import gethostname
from urllib.parse import unquote
from zlib import crc32
from multiprocessing import Pool
##### 自定义部分 #####
# 定义日志格式,利用非贪婪匹配和分组匹配,需要严格参照日志定义中的分隔符和引号
log_pattern = r‘^(?P<remote_addr>.*?) - \[(?P<time_local>.*?)\] "(?P<request>.*?)"‘ r‘ (?P<status>.*?) (?P<body_bytes_sent>.*?) (?P<request_time>.*?)‘ r‘ "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)" - (?P<http_x_forwarded_for>.*)$‘
# request的正则,其实是由 "request_method request_uri server_protocol"三部分组成
request_uri_pattern = r‘^(?P<request_method>(GET|POST|HEAD|DELETE)?) (?P<request_uri>.*?) (?P<server_protocol>HTTP.*)$‘
# 日志目录
log_dir = ‘/nginx_log/‘
# 日志文件命名:作者场景是www.access.log格式,只要保证xxx.access即可,后面随意
# 要处理的站点(可随需要想list中添加)
todo = [‘www‘, ‘news‘, ‘m.api‘,]
# MySQL相关设置
mysql_host = ‘xxxx‘
mysql_user = ‘xxxx‘
mysql_passwd = ‘xxxx‘
mysql_port = ‘xxxx‘
mysql_database = ‘xxxx‘
# 表结构:所有字段均加了默认值,所以即使日志格式和我的不同也可以不用修改表结构和插入语句
creat_table = "CREATE TABLE IF NOT EXISTS {} ( id bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY, server char(11) NOT NULL DEFAULT ‘‘, uri_abs varchar(200) NOT NULL DEFAULT ‘‘ COMMENT ‘对$uri做uridecode,然后做抽象化处理‘, uri_abs_crc32 bigint unsigned NOT NULL DEFAULT ‘0‘ COMMENT ‘对上面uri_abs字段计算crc32‘, args_abs varchar(200) NOT NULL DEFAULT ‘‘ COMMENT ‘对$args做uridecode,然后做抽象化处理‘, args_abs_crc32 bigint unsigned NOT NULL DEFAULT ‘0‘ COMMENT ‘对上面args字段计算crc32‘, time_local timestamp NOT NULL DEFAULT ‘0000-00-00 00:00:00‘, response_code smallint NOT NULL DEFAULT ‘0‘, bytes_sent int NOT NULL DEFAULT ‘0‘ COMMENT ‘发送给客户端的响应大小‘, request_time float(6,3) NOT NULL DEFAULT ‘0.000‘, user_ip varchar(40) NOT NULL DEFAULT ‘‘, cdn_ip varchar(15) NOT NULL DEFAULT ‘‘ COMMENT ‘CDN最后节点的ip:空字串表示没经过CDN; - 表示没经过CDN和F5‘, request_method varchar(7) NOT NULL DEFAULT ‘‘, uri varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘$uri,已做uridecode‘, args varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘$args,已做uridecode‘, referer varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘‘, KEY time_local (time_local), KEY uri_abs_crc32 (uri_abs_crc32), KEY args_abs_crc32 (args_abs_crc32) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 row_format=compressed"
##### 自定义部分结束 #####
# 主机名
global server
server = gethostname()
# 今天零点
global today_start
today_start = time.strftime(‘%Y-%m-%d‘, time.localtime()) + ‘ 00:00:00‘
# 将pymysql对于操作中的警告信息转为可捕捉的异常
warnings.filterwarnings(‘error‘, category=pymysql.err.Warning)
def my_connect():
"""链接数据库"""
global connection, con_cur
try:
connection = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_passwd,
charset=‘utf8mb4‘, port=mysql_port, autocommit=True, database=mysql_database)
except pymysql.err.MySQLError as err:
print(‘Error: ‘ + str(err))
exit(20)
con_cur = connection.cursor()
def create_table(t_name):
"""创建各站点对应的表"""
my_connect()
try:
con_cur.execute(creat_table.format(t_name))
except pymysql.err.Warning:
pass
def process_line(line_str):
"""
处理每一行记录
line_str: 该行数据的字符串形式
"""
processed = log_pattern_obj.search(line_str)
if not processed:
‘‘‘如果正则根本就无法匹配该行记录时‘‘‘
print("Can‘t process this line: {}".format(line_str))
return server, ‘‘, 0, ‘‘, 0, ‘‘, ‘‘, ‘‘, ‘‘, ‘‘, ‘‘
else:
# remote_addr (客户若不经过代理,则可认为用户的真实ip)
remote_addr = processed.group(‘remote_addr‘)
# time_local
time_local = processed.group(‘time_local‘)
# 转换时间为mysql date类型
ori_time = time.strptime(time_local.split()[0], ‘%d/%b/%Y:%H:%M:%S‘)
new_time = time.strftime(‘%Y-%m-%d %H:%M:%S‘, ori_time)
# 处理uri和args
request = processed.group(‘request‘)
request_further = request_uri_pattern_obj.search(request)
if request_further:
request_method = request_further.group(‘request_method‘)
request_uri = request_further.group(‘request_uri‘)
uri_args = request_uri.split(‘?‘, 1)
# 对uri和args进行urldecode
uri = unquote(uri_args[0])
args = ‘‘ if len(uri_args) == 1 else unquote(uri_args[1])
# 对uri和args进行抽象化
uri_abs = text_abstract(uri, ‘uri‘)
args_abs = text_abstract(args, ‘args‘)
# 对库里的uri_abs和args_abs字段进行crc32校验
uri_abs_crc32 = crc32(uri_abs.encode())
args_abs_crc32 = 0 if args_abs == ‘‘ else crc32(args_abs.encode())
else:
print(‘$request abnormal: {}‘.format(line_str))
request_method = ‘‘
uri = request
uri_abs = ‘‘
uri_abs_crc32 = 0
args = ‘‘
args_abs = ‘‘
args_abs_crc32 = 0
# 状态码,字节数,响应时间
response_code = processed.group(‘status‘)
bytes_sent = processed.group(‘body_bytes_sent‘)
request_time = processed.group(‘request_time‘)
# user_ip,cdn最后节点ip,以及是否经过F5
http_x_forwarded_for = processed.group(‘http_x_forwarded_for‘)
ips = http_x_forwarded_for.split()
# user_ip:用户真实ip
# cdn_ip: CDN最后节点的ip,‘‘表示没经过CDN;‘-‘表示没经过CDN和F5
if http_x_forwarded_for == ‘-‘:
‘‘‘没经过CDN和F5‘‘‘
user_ip = remote_addr
cdn_ip = ‘-‘
elif ips[0] == remote_addr:
‘‘‘没经过CDN,经过F5‘‘‘
user_ip = remote_addr
cdn_ip = ‘‘
else:
‘‘‘经过CDN和F5‘‘‘
user_ip = ips[0].rstrip(‘,‘)
cdn_ip = ips[-1]
return (server, uri_abs, uri_abs_crc32, args_abs, args_abs_crc32, new_time, response_code, bytes_sent,
request_time, user_ip, cdn_ip, request_method, uri, args)
def text_abstract(text, what):
"""进一步处理uri和args,将其做抽象化,方便对其进行归类
如uri: /article/10.html 抽象为 /article/?.html
如args: s=你好&type=0 抽象为 s=?&type=?
规则:待处理部分由[a-zA-Z\-_]组成的,则保留,其他情况值转为‘?‘
"""
tmp_abs = ‘‘
if what == ‘uri‘:
uri_list = [tmp for tmp in text.split(‘/‘) if tmp != ‘‘]
if len(uri_list) == 0:
‘‘‘uri为"/"的情况‘‘‘
tmp_abs = ‘/‘
else:
for i in range(len(uri_list)):
if not re.match(r‘[a-zA-Z\-_]+?(\..*)?$‘, uri_list[i]):
‘‘‘uri不符合规则时,进行转换‘‘‘
if ‘.‘ in uri_list[i]:
if not re.match(r‘[a-zA-Z\-_]+$‘, uri_list[i].split(‘.‘)[0]):
uri_list[i] = ‘?.‘ + uri_list[i].split(‘.‘)[1]
else:
uri_list[i] = ‘?‘
for v in uri_list:
tmp_abs += ‘/{}‘.format(v)
if text.endswith(‘/‘):
‘‘‘如果原uri后面有"/",要保留‘‘‘
tmp_abs += ‘/‘
elif what == ‘args‘:
if text == ‘‘:
tmp_abs = ‘‘
else:
try:
tmp_dict = OrderedDict((tmp.split(‘=‘) for tmp in text.split(‘&‘)))
for k, v in tmp_dict.items():
if not re.match(r‘[a-zA-Z\-_]+$‘, v):
‘‘‘除了value值为全字母的情况,都进行转换‘‘‘
tmp_dict[k] = ‘?‘
for k, v in tmp_dict.items():
if tmp_abs == ‘‘:
tmp_abs += ‘{}={}‘.format(k, v)
else:
tmp_abs += ‘&{}={}‘.format(k, v)
except ValueError:
‘‘‘参数中没有= 或者 即没&也没= 会抛出ValueError‘‘‘
tmp_abs = ‘?‘
return tmp_abs
def insert_data(line_data, cursor, results, limit, t_name, l_name):
"""
记录处理之后的数据,累积limit条执行一次插入
line_data:每行处理之前的字符串数据;
limit:每limit行执行一次数据插入;
t_name:对应的表名;
l_name:日志文件名
"""
line_result = process_line(line_data)
results.append(line_result)
# print(‘len(result):{}‘.format(len(result))) #debug
if len(results) == limit:
insert_correct(cursor, results, t_name, l_name)
results.clear()
print(‘{} {} 处理至 {}‘.format(time.strftime(‘%H:%M:%S‘, time.localtime()), l_name, line_result[5]))
def insert_correct(cursor, results, t_name, l_name):
"""在插入数据过程中处理异常"""
insert_sql = ‘insert into {} (server,uri_abs,uri_abs_crc32,args_abs,args_abs_crc32,time_local,response_code,‘ ‘bytes_sent,request_time,user_ip,cdn_ip,request_method,uri,args) ‘ ‘values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)‘.format(t_name)
try:
cursor.executemany(insert_sql, results)
except pymysql.err.Warning as err:
print(‘\n{} Warning: {}‘.format(l_name, err))
except pymysql.err.MySQLError as err:
print(‘\n{} Error: {}‘.format(l_name, err))
print(‘插入数据时出错...\n‘)
connection.close()
exit(10)
def get_prev_num(t_name, l_name):
"""取得今天已入库的行数 t_name:表名 l_name:日志文件名"""
try:
con_cur.execute(‘select min(id) from {0} where time_local=(‘
‘select min(time_local) from {0} where time_local>="{1}")‘.format(t_name, today_start))
min_id = con_cur.fetchone()[0]
if min_id is not None: # 假如有今天的数据
con_cur.execute(‘select max(id) from {}‘.format(t_name))
max_id = con_cur.fetchone()[0]
con_cur.execute(
‘select count(*) from {} where id>={} and id<={} and server="{}"‘.format(t_name, min_id, max_id, server))
prev_num = con_cur.fetchone()[0]
else:
prev_num = 0
return prev_num
except pymysql.err.MySQLError as err:
print(‘Error: {}‘.format(err))
print(‘Error:未取得已入库的行数,本次跳过{}\n‘.format(l_name))
return
def del_old_data(t_name, l_name, n=3):
"""删除n天前的数据,n默认为3"""
# n天前的日期间
three_days_ago = time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(time.time() - 3600 * 24 * n))
try:
con_cur.execute(‘select max(id) from {0} where time_local=(‘
‘select max(time_local) from {0} where time_local!="0000-00-00 00:00:00" and time_local<="{1}")‘.format(
t_name, three_days_ago))
max_id = con_cur.fetchone()[0]
if max_id is not None:
con_cur.execute(‘delete from {} where id<={}‘.format(t_name, max_id))
except pymysql.err.MySQLError as err:
print(‘\n{} Error: {}‘.format(l_name, err))
print(‘未能删除表{}天前的数据...\n‘.format(n))
def main_loop(log_name):
"""主逻辑 log_name:日志文件名"""
table_name = log_name.split(‘.access‘)[0].replace(‘.‘, ‘_‘) # 将域名例如m.api转换成m_api,因为表名中不能包含‘.‘
results = []
# 创建表
create_table(table_name)
# 当前日志文件总行数
num = int(subprocess.run(‘wc -l {}‘.format(log_dir + log_name), shell=True, stdout=subprocess.PIPE,
universal_newlines=True).stdout.split()[0])
print(‘num: {}‘.format(num)) # debug
# 上一次处理到的行数
prev_num = get_prev_num(table_name, log_name)
if prev_num is not None:
# 根据当前行数和上次处理之后记录的行数对比,来决定本次要处理的行数范围
i = 0
with open(log_name) as fp:
for line in fp:
i += 1
if i <= prev_num:
continue
elif prev_num < i <= num:
insert_data(line, con_cur, results, 1000, table_name, log_name)
else:
break
# 插入不足1000行的results
if len(results) > 0:
insert_correct(con_cur, results, table_name, log_name)
del_old_data(table_name, log_name)
if __name__ == "__main__":
# 检测如果当前已经有该脚本在运行,则退出
if_run = subprocess.run(‘ps -ef|grep {}|grep -v grep|grep -v "/bin/sh"|wc -l‘.format(argv[0]), shell=True,
stdout=subprocess.PIPE).stdout
if if_run.decode().strip(‘\n‘) == ‘1‘:
os.chdir(log_dir)
logs_list = os.listdir(log_dir)
logs_list = [i for i in logs_list if ‘access‘ in i and os.path.isfile(i) and i.split(‘.access‘)[0] in todo]
if len(logs_list) > 0:
# 并行
with Pool(len(logs_list)) as p:
p.map(main_loop, logs_list)
最后按照我们期望的间隔设置计划任务即可*/30 * * * * export LANG=zh_CN.UTF-8;python3 /root/log_analyse_parall.py &> /tmp/log_analyse.py3
关于这样一个对不确定格式的大量文本进行分析的脚本来说,通用性和执行效率两个因素非常重要。通用性上文中已大致说明了原理;性能方面,经笔者在一台4核虚拟机上进行测试结果如下
# 4个日志文件共80w(每个20w)行记录,利用多进程并发处理,主进程派生出4个子进程来处理
# 处理时间
[ljk@git-svn ~]# time python3 shells/log_analyse_parall.py > /tmp/new_log.txt
real 0m24.057s
user 1m5.417s
sys 0m0.595s
# 4个进程平均每秒钟处理约3.3w行数据
Python+MySQL实现web日志分析
标签:日志分析 性能分析 故障分析 python日志分析 python+mongodb日志分析