当前位置:Gxlcms > 数据库问题 > Python+MySQL实现web日志分析

Python+MySQL实现web日志分析

时间:2021-07-01 10:21:17 帮助过:3人阅读

(本文已不再同步更新,最新代码请见github)
日志分析在web系统中故障排查、性能分析方面有着非常重要的作用。目前,开源的ELK系统是成熟且功能强大的选择。但是部署及学习成本亦然不低,这里我实现了一个方法上相对简单(但准确度和效率是有保证的)的实现。另外该脚本的侧重点不是通常的PV,UV等展示,而是短期内(如三天历史or一周历史)提供细粒度的异常和性能分析。

先说一下我想实现这个功能的驱动力(痛点)吧:
我们有不少站点,前边有CDN,原站前面是F5,走到源站的访问总量日均PV约5000w。下面是我们经常面临一些问题:

  • CDN回源异常,可能导致我们源站带宽和负载都面临较大的压力。这时需要能快速的定位到是多了哪些回源IP(即CDN节点)或是某个IP的回源量异常,又或是哪些url的回源量异常
  • 在排除了CDN回源问题之后,根据zabbix监控对一些异常的流量或者负载波动按异常时段对比正常时段进行分析,定位到具体的某(几)类url。反馈给开发进行review以及优化
  • 有时zabbix会监控到应用服务器和DB或者缓存服务器之间的流量异常,这种问题一般定位起来是比较麻烦的,甚至波动仅仅是在一两分钟内,这就需要对日志有一个非常精细的分析粒度
  • 我们希望能所有的应用服务器能过在本机分析日志(分布式的思想),然后将分析结果汇总到一起(MySQL)以便查看;并且还希望能尽可能的实时(将定时任务间隔设置低一些),以便发现问题后能尽快的通过此平台进行分析
  • 通用性能:对于不同的日志格式只需对脚本稍加改动即可分析;因为将日志分析放在应用服务器本机,所以脚本的性能和效率也要有保证,不能影响业务

再说下原理:
比较简单,就是利用python的re模块通过正则表达式对日志进行分析处理,取得uriargs时间当前状态码响应大小响应时间用户IPCDN ipserver name 等信息存储进数据库。

当然前提规范也是必须的:

  • 各台server的日志文件按统一路径存放
  • 日志格式保持一致
  • 每天的0点日志切割

我的nginx日志格式如下:

log_format  access  ‘$remote_addr - [$time_local] "$request" ‘
             ‘$status $body_bytes_sent $request_time "$http_referer" ‘
             ‘"$http_user_agent" - $http_x_forwarded_for‘;

日志分析原理:
通过Python的re模块,按照应用服务器的日志格式编写正则,例如按照我的日志格式,写出的正则如下(编写正则时,先不要换行,确保空格或引号等与日志格式一致,最后考虑美观可以折行)

log_pattern = r‘^(?P<remote_addr>.*?) - \[(?P<time_local>.*?)\] "(?P<request>.*?)"‘               r‘ (?P<status>.*?) (?P<body_bytes_sent>.*?) (?P<request_time>.*?)‘               r‘ "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)" - (?P<http_x_forwarded_for>.*)$‘

log_pattern_obj = re.compile(log_pattern)

用以上正则来整体匹配一行日志记录,然后各个部分可以通过log_pattern_obj.search(log).group(‘remote_addr‘)log_pattern_obj.search(log).group(‘body_bytes_sent‘)等形式来访问

对于其他格式的nginx日志或者Apache日志,按照如上原则,并对数据库结构做相应调整,都可以轻松的使用该脚本分析处理。

原理虽简单但实现起来却发现有好多坑,主要是按照上述的日志格式(靠空格或双引号来分割各段)主要问题是面对各种不规范的记录时(原因不一而足,而且也是样式繁多),如何正确的分割及处理日志的各字段,这也是我用re模块而不是简单的split()函数的原因。代码里对一些“可以容忍”的异常记录通过一些判断逻辑予以处理;对于“无法容忍”的异常记录则返回空字符串并将日志记录于文件。

其实对于上述的这些不规范的请求,最好的办法是在nginx中定义日志格式时,用一个特殊字符作为分隔符,例如“|”。这样都不用Python的re模块,直接字符串分割就能正确的获取到各段。

接下来看看使用效果:
先看一行数据库里的记录

*************************** 9. row ***************************
            id: 9
        server: web6
       uri_abs: /chapter/?/?.json
 uri_abs_crc32: 443227294
      args_abs: channel=ios&version=?
args_abs_crc32: 2972340533
    time_local: 2017-02-22 23:59:01
 response_code: 200
    bytes_sent: 218
  request_time: 0.028
       user_ip: 210.78.141.185
        cdn_ip: 27.221.112.163
request_method: GET
           uri: /chapter/14278/28275.json
          args: channel=ios&version=2.0.6
       referer:

其中uri_absargs_abs是对uri和args进行抽象化(抽象出一个模式出来)处理之后的结果。对uri中个段和args中的value部分除了完全由[a-zA-Z-_]+组成的部分之外的部分都用“?”做替换。uri_abs_crc32args_abs_crc32两列是对抽象化结果进行crc32计算,这两列单纯只是为了在MySQL中对uri或args进行分类统计汇总时得到更好的性能。

现在还没有完成统一分析的入口脚本,所以还是以sql语句的形式来查询(对用户的sql功底有要求,不友好待改善)

  • 查询某站点日/小时pv(其实这一套东西的关注点并不在类似的基础的统计上)
    select count(*) from www where time_local>=‘2016-12-09 00:00:00‘ and time_local<=‘2016-12-09 23:59:59‘
  • 查询某类型url总量(or指定时间段内该url总量)
    依据表中的url_abs_crc32字段
    mysql> select count(*) from www where uri_abs_crc32=2043925204 and time_local > ‘2016-11-23 10:00:00‘ and time_local <‘2016-11-23 23:59:59‘;
  • 平均响应时间排行(可基于总量分析;亦可根据时段对比分析)
    mysql> select uri_abs,count(*) as num,sum(request_time) as total_time,sum(request_time)/count(*) as average_time from www group by uri_abs_crc32 order by num desc limit 5;
    +------------------------------------------+---------+------------+--------------+
    | uri_abs                                  | num     | total_time | average_time |
    +------------------------------------------+---------+------------+--------------+
    | /comicsum/comicshot.php                  | 2700716 |   1348.941 |    0.0004995 |
    | /category/?.html                         |  284788 | 244809.877 |    0.8596215 |
    | /                                        |   72429 |   1172.113 |    0.0161829 |
    | /static/hits/?.json                      |   27451 |      7.658 |    0.0002790 |
    | /dynamic/o_search/searchKeyword          |   26230 |   3757.661 |    0.1432581 |
    +------------------------------------------+---------+------------+--------------+
    10 rows in set (40.09 sec)
    • 平均响应大小排行
      mysql> select uri_abs,count(*) as num,sum(bytes_sent) as total_bytes,sum(bytes_sent)/count(*) as average_bytes from www group by uri_abs_crc32 order by num desc,average_bytes desc limit 10;    
      +------------------------------------------+---------+-------------+---------------+
      | uri_abs                                  | num     | total_bytes | average_bytes |
      +------------------------------------------+---------+-------------+---------------+
      | /comicsum/comicshot.php                  | 2700716 |    72889752 |       26.9890 |
      | /category/?.html                         |  284788 |  3232744794 |    11351.4080 |
      | /                                        |   72429 |  1904692759 |    26297.3776 |
      | /static/hits/?.json                      |   27451 |     5160560 |      187.9917 |
      | /dynamic/o_search/searchKeyword          |   26230 |     3639846 |      138.7665 |
      +------------------------------------------+---------+-------------+---------------+

      以上只列举了几个例子,基本上除了UA部分(代码中已有捕捉,但是笔者用不到),其他的信息都以包含到表中。因此几乎可以对网站流量负载,响应时间等方面的任何疑问给出数据上的支持。

Python外部包依赖:pymysql
MySQL(笔者5.6版本)将innodb_file_format设置为Barracuda(这个设置并不对其他库表产生影响,即使生产数据库设置也无妨),以便在建表语句中可以通过ROW_FORMAT=COMPRESSED将innodb表这只为压缩模式,笔者实验开启压缩模式后,数据文件大小将近减小50%。

接下来请看代码:

#!/bin/env python3
# coding:utf-8
"""
ljk 20161116(update 20170217)
This script should be put in crontab in every web server.Execute every n minutes.
Collect nginx access log, process it and insert the result into mysql.
"""
import os
import re
import subprocess
import time
import warnings
import pymysql
from sys import argv, exit
from socket import gethostname
from urllib.parse import unquote
from zlib import crc32
from multiprocessing import Pool

##### 自定义部分 #####
# 定义日志格式,利用非贪婪匹配和分组匹配,需要严格参照日志定义中的分隔符和引号
log_pattern = r‘^(?P<remote_addr>.*?) - \[(?P<time_local>.*?)\] "(?P<request>.*?)"‘               r‘ (?P<status>.*?) (?P<body_bytes_sent>.*?) (?P<request_time>.*?)‘               r‘ "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)" - (?P<http_x_forwarded_for>.*)$‘
# request的正则,其实是由 "request_method request_uri server_protocol"三部分组成
request_uri_pattern = r‘^(?P<request_method>(GET|POST|HEAD|DELETE)?) (?P<request_uri>.*?) (?P<server_protocol>HTTP.*)$‘
# 日志目录
log_dir = ‘/nginx_log/‘
# 日志文件命名:作者场景是www.access.log格式,只要保证xxx.access即可,后面随意
# 要处理的站点(可随需要想list中添加)
todo = [‘www‘, ‘news‘, ‘m.api‘,]
# MySQL相关设置
mysql_host = ‘xxxx‘
mysql_user = ‘xxxx‘
mysql_passwd = ‘xxxx‘
mysql_port = ‘xxxx‘
mysql_database = ‘xxxx‘
# 表结构:所有字段均加了默认值,所以即使日志格式和我的不同也可以不用修改表结构和插入语句
creat_table = "CREATE TABLE IF NOT EXISTS {} (                id bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY,                server char(11) NOT NULL DEFAULT ‘‘,                uri_abs varchar(200) NOT NULL DEFAULT ‘‘ COMMENT ‘对$uri做uridecode,然后做抽象化处理‘,                uri_abs_crc32 bigint unsigned NOT NULL DEFAULT ‘0‘ COMMENT ‘对上面uri_abs字段计算crc32‘,                args_abs varchar(200) NOT NULL DEFAULT ‘‘ COMMENT ‘对$args做uridecode,然后做抽象化处理‘,                args_abs_crc32 bigint unsigned NOT NULL DEFAULT ‘0‘ COMMENT ‘对上面args字段计算crc32‘,                time_local timestamp NOT NULL DEFAULT ‘0000-00-00 00:00:00‘,                response_code smallint NOT NULL DEFAULT ‘0‘,                bytes_sent int NOT NULL DEFAULT ‘0‘ COMMENT ‘发送给客户端的响应大小‘,                request_time float(6,3) NOT NULL DEFAULT ‘0.000‘,                user_ip varchar(40) NOT NULL DEFAULT ‘‘,                cdn_ip varchar(15) NOT NULL DEFAULT ‘‘ COMMENT ‘CDN最后节点的ip:空字串表示没经过CDN; - 表示没经过CDN和F5‘,                request_method varchar(7) NOT NULL DEFAULT ‘‘,                uri varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘$uri,已做uridecode‘,                args varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘$args,已做uridecode‘,                referer varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘‘,                KEY time_local (time_local),                KEY uri_abs_crc32 (uri_abs_crc32),                KEY args_abs_crc32 (args_abs_crc32)            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 row_format=compressed"
##### 自定义部分结束 #####

# 主机名
global server
server = gethostname()
# 今天零点
global today_start
today_start = time.strftime(‘%Y-%m-%d‘, time.localtime()) + ‘ 00:00:00‘
# 将pymysql对于操作中的警告信息转为可捕捉的异常
warnings.filterwarnings(‘error‘, category=pymysql.err.Warning)

def my_connect():
    """链接数据库"""
    global connection, con_cur
    try:
        connection = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_passwd,
                                     charset=‘utf8mb4‘, port=mysql_port, autocommit=True, database=mysql_database)
    except pymysql.err.MySQLError as err:
        print(‘Error: ‘ + str(err))
        exit(20)
    con_cur = connection.cursor()

def create_table(t_name):
    """创建各站点对应的表"""
    my_connect()
    try:
        con_cur.execute(creat_table.format(t_name))
    except pymysql.err.Warning:
        pass

def process_line(line_str):
    """
    处理每一行记录
    line_str: 该行数据的字符串形式
    """
    processed = log_pattern_obj.search(line_str)
    if not processed:
        ‘‘‘如果正则根本就无法匹配该行记录时‘‘‘
        print("Can‘t process this line: {}".format(line_str))
        return server, ‘‘, 0, ‘‘, 0, ‘‘, ‘‘, ‘‘, ‘‘, ‘‘, ‘‘
    else:
        # remote_addr (客户若不经过代理,则可认为用户的真实ip)
        remote_addr = processed.group(‘remote_addr‘)

        # time_local
        time_local = processed.group(‘time_local‘)
        # 转换时间为mysql date类型
        ori_time = time.strptime(time_local.split()[0], ‘%d/%b/%Y:%H:%M:%S‘)
        new_time = time.strftime(‘%Y-%m-%d %H:%M:%S‘, ori_time)

        # 处理uri和args
        request = processed.group(‘request‘)
        request_further = request_uri_pattern_obj.search(request)
        if request_further:
            request_method = request_further.group(‘request_method‘)
            request_uri = request_further.group(‘request_uri‘)
            uri_args = request_uri.split(‘?‘, 1)
            # 对uri和args进行urldecode
            uri = unquote(uri_args[0])
            args = ‘‘ if len(uri_args) == 1 else unquote(uri_args[1])
            # 对uri和args进行抽象化
            uri_abs = text_abstract(uri, ‘uri‘)
            args_abs = text_abstract(args, ‘args‘)
            # 对库里的uri_abs和args_abs字段进行crc32校验
            uri_abs_crc32 = crc32(uri_abs.encode())
            args_abs_crc32 = 0 if args_abs == ‘‘ else crc32(args_abs.encode())
        else:
            print(‘$request abnormal: {}‘.format(line_str))
            request_method = ‘‘
            uri = request
            uri_abs = ‘‘
            uri_abs_crc32 = 0
            args = ‘‘
            args_abs = ‘‘
            args_abs_crc32 = 0

        # 状态码,字节数,响应时间
        response_code = processed.group(‘status‘)
        bytes_sent = processed.group(‘body_bytes_sent‘)
        request_time = processed.group(‘request_time‘)

        # user_ip,cdn最后节点ip,以及是否经过F5
        http_x_forwarded_for = processed.group(‘http_x_forwarded_for‘)
        ips = http_x_forwarded_for.split()
        # user_ip:用户真实ip
        # cdn_ip: CDN最后节点的ip,‘‘表示没经过CDN;‘-‘表示没经过CDN和F5
        if http_x_forwarded_for == ‘-‘:
            ‘‘‘没经过CDN和F5‘‘‘
            user_ip = remote_addr
            cdn_ip = ‘-‘
        elif ips[0] == remote_addr:
            ‘‘‘没经过CDN,经过F5‘‘‘
            user_ip = remote_addr
            cdn_ip = ‘‘
        else:
            ‘‘‘经过CDN和F5‘‘‘
            user_ip = ips[0].rstrip(‘,‘)
            cdn_ip = ips[-1]

        return (server, uri_abs, uri_abs_crc32, args_abs, args_abs_crc32, new_time, response_code, bytes_sent,
                request_time, user_ip, cdn_ip, request_method, uri, args)

def text_abstract(text, what):
    """进一步处理uri和args,将其做抽象化,方便对其进行归类
    如uri: /article/10.html 抽象为 /article/?.html
    如args: s=你好&type=0 抽象为 s=?&type=?
    规则:待处理部分由[a-zA-Z\-_]组成的,则保留,其他情况值转为‘?‘
    """
    tmp_abs = ‘‘
    if what == ‘uri‘:
        uri_list = [tmp for tmp in text.split(‘/‘) if tmp != ‘‘]
        if len(uri_list) == 0:
            ‘‘‘uri为"/"的情况‘‘‘
            tmp_abs = ‘/‘
        else:
            for i in range(len(uri_list)):
                if not re.match(r‘[a-zA-Z\-_]+?(\..*)?$‘, uri_list[i]):
                    ‘‘‘uri不符合规则时,进行转换‘‘‘
                    if ‘.‘ in uri_list[i]:
                        if not re.match(r‘[a-zA-Z\-_]+$‘, uri_list[i].split(‘.‘)[0]):
                            uri_list[i] = ‘?.‘ + uri_list[i].split(‘.‘)[1]
                    else:
                        uri_list[i] = ‘?‘
            for v in uri_list:
                tmp_abs += ‘/{}‘.format(v)
            if text.endswith(‘/‘):
                ‘‘‘如果原uri后面有"/",要保留‘‘‘
                tmp_abs += ‘/‘
    elif what == ‘args‘:
            if text == ‘‘:
                tmp_abs = ‘‘
            else:
                try:
                    tmp_dict = OrderedDict((tmp.split(‘=‘) for tmp in text.split(‘&‘)))
                    for k, v in tmp_dict.items():
                        if not re.match(r‘[a-zA-Z\-_]+$‘, v):
                            ‘‘‘除了value值为全字母的情况,都进行转换‘‘‘
                            tmp_dict[k] = ‘?‘
                    for k, v in tmp_dict.items():
                        if tmp_abs == ‘‘:
                            tmp_abs += ‘{}={}‘.format(k, v)
                        else:
                            tmp_abs += ‘&{}={}‘.format(k, v)
                except ValueError:
                    ‘‘‘参数中没有= 或者 即没&也没= 会抛出ValueError‘‘‘
                    tmp_abs = ‘?‘
    return tmp_abs

def insert_data(line_data, cursor, results, limit, t_name, l_name):
    """
    记录处理之后的数据,累积limit条执行一次插入
    line_data:每行处理之前的字符串数据;
    limit:每limit行执行一次数据插入;
    t_name:对应的表名;
    l_name:日志文件名
    """
    line_result = process_line(line_data)
    results.append(line_result)
    # print(‘len(result):{}‘.format(len(result)))    #debug
    if len(results) == limit:
        insert_correct(cursor, results, t_name, l_name)
        results.clear()
        print(‘{} {} 处理至 {}‘.format(time.strftime(‘%H:%M:%S‘, time.localtime()), l_name, line_result[5]))

def insert_correct(cursor, results, t_name, l_name):
    """在插入数据过程中处理异常"""
    insert_sql = ‘insert into {} (server,uri_abs,uri_abs_crc32,args_abs,args_abs_crc32,time_local,response_code,‘                  ‘bytes_sent,request_time,user_ip,cdn_ip,request_method,uri,args) ‘                  ‘values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)‘.format(t_name)
    try:
        cursor.executemany(insert_sql, results)
    except pymysql.err.Warning as err:
        print(‘\n{}    Warning: {}‘.format(l_name, err))
    except pymysql.err.MySQLError as err:
        print(‘\n{}    Error: {}‘.format(l_name, err))
        print(‘插入数据时出错...\n‘)
        connection.close()
        exit(10)

def get_prev_num(t_name, l_name):
    """取得今天已入库的行数 t_name:表名 l_name:日志文件名"""
    try:
        con_cur.execute(‘select min(id) from {0} where time_local=(‘
                        ‘select min(time_local) from {0} where time_local>="{1}")‘.format(t_name, today_start))
        min_id = con_cur.fetchone()[0]
        if min_id is not None:  # 假如有今天的数据
            con_cur.execute(‘select max(id) from {}‘.format(t_name))
            max_id = con_cur.fetchone()[0]
            con_cur.execute(
                ‘select count(*) from {} where id>={} and id<={} and server="{}"‘.format(t_name, min_id, max_id, server))
            prev_num = con_cur.fetchone()[0]
        else:
            prev_num = 0
        return prev_num
    except pymysql.err.MySQLError as err:
        print(‘Error: {}‘.format(err))
        print(‘Error:未取得已入库的行数,本次跳过{}\n‘.format(l_name))
        return

def del_old_data(t_name, l_name, n=3):
    """删除n天前的数据,n默认为3"""
    # n天前的日期间
    three_days_ago = time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(time.time() - 3600 * 24 * n))
    try:
        con_cur.execute(‘select max(id) from {0} where time_local=(‘
                        ‘select max(time_local) from {0} where time_local!="0000-00-00 00:00:00" and time_local<="{1}")‘.format(
            t_name, three_days_ago))
        max_id = con_cur.fetchone()[0]
        if max_id is not None:
            con_cur.execute(‘delete from {} where id<={}‘.format(t_name, max_id))
    except pymysql.err.MySQLError as err:
        print(‘\n{}    Error: {}‘.format(l_name, err))
        print(‘未能删除表{}天前的数据...\n‘.format(n))

def main_loop(log_name):
    """主逻辑 log_name:日志文件名"""
    table_name = log_name.split(‘.access‘)[0].replace(‘.‘, ‘_‘)  # 将域名例如m.api转换成m_api,因为表名中不能包含‘.‘
    results = []
    # 创建表
    create_table(table_name)

    # 当前日志文件总行数
    num = int(subprocess.run(‘wc -l {}‘.format(log_dir + log_name), shell=True, stdout=subprocess.PIPE,
                             universal_newlines=True).stdout.split()[0])
    print(‘num: {}‘.format(num))  # debug
    # 上一次处理到的行数
    prev_num = get_prev_num(table_name, log_name)
    if prev_num is not None:
        # 根据当前行数和上次处理之后记录的行数对比,来决定本次要处理的行数范围
        i = 0
        with open(log_name) as fp:
            for line in fp:
                i += 1
                if i <= prev_num:
                    continue
                elif prev_num < i <= num:
                    insert_data(line, con_cur, results, 1000, table_name, log_name)
                else:
                    break
        # 插入不足1000行的results
        if len(results) > 0:
            insert_correct(con_cur, results, table_name, log_name)

    del_old_data(table_name, log_name)

if __name__ == "__main__":
    # 检测如果当前已经有该脚本在运行,则退出
    if_run = subprocess.run(‘ps -ef|grep {}|grep -v grep|grep -v "/bin/sh"|wc -l‘.format(argv[0]), shell=True,
                            stdout=subprocess.PIPE).stdout
    if if_run.decode().strip(‘\n‘) == ‘1‘:
        os.chdir(log_dir)
        logs_list = os.listdir(log_dir)
        logs_list = [i for i in logs_list if ‘access‘ in i and os.path.isfile(i) and i.split(‘.access‘)[0] in todo]
        if len(logs_list) > 0:
            # 并行
            with Pool(len(logs_list)) as p:
                p.map(main_loop, logs_list)

最后按照我们期望的间隔设置计划任务即可
*/30 * * * * export LANG=zh_CN.UTF-8;python3 /root/log_analyse_parall.py &&gt; /tmp/log_analyse.py3

关于这样一个对不确定格式的大量文本进行分析的脚本来说,通用性执行效率两个因素非常重要。通用性上文中已大致说明了原理;性能方面,经笔者在一台4核虚拟机上进行测试结果如下

# 4个日志文件共80w(每个20w)行记录,利用多进程并发处理,主进程派生出4个子进程来处理
# 处理时间
[ljk@git-svn ~]# time python3 shells/log_analyse_parall.py > /tmp/new_log.txt

real    0m24.057s
user    1m5.417s
sys     0m0.595s
# 4个进程平均每秒钟处理约3.3w行数据

Python+MySQL实现web日志分析

标签:日志分析   性能分析   故障分析   python日志分析   python+mongodb日志分析   

人气教程排行