当前位置:Gxlcms > 数据库问题 > 基于binlog来分析mysql的行记录修改情况(python脚本分析)

基于binlog来分析mysql的行记录修改情况(python脚本分析)

时间:2021-07-01 10:21:17 帮助过:24人阅读

      有些情况下,可能会统计在某个时间段内,MySQL修改了多少数据量?发生了多少事务?主要是哪些表格发生变动?变动的数量是怎么样的? 但是却不需要行记录的修改内容,只需要了解 行数据的 变动情况。     这些情况部分可以通过监控来大致了解,但是也可以基于binlog来全盘分析,binlog的格式是row模式。     在写flashback的时候,顺带把这个也写了个脚步,使用python编写,都差不多原理,只是这个简单些,介于个人python弱的不行,性能可能还有很大的提升空间,也希望园友能协助优化下。     先贴python脚步的分析结果图如下,分为4个部分:事务耗时情况、事务影响行数情况、DML行数情况以及操作最频繁表格情况。 技术分享

2 脚本简单描述

    脚本依赖的模块中,pymysql需要自行安装。     创建类queryanalyse,其中有5个函数定义:_get_db、create_tab、rowrecord、binlogdesc跟closeconn。

2.1 _get_db

    该函数用来解析输入参数值,参数值一共有7个,都是必须填写的。分别为host,user,password,port,table name for transaction,table name for records,对应的简写如下:   ALL options need to assign:   -h    : host, the database host,which database will store the results after analysis -u    : user, the db user -p    : password, the db user‘s password -P    : port, the db port -f    : file path, the binlog file -tr    : table name for record , the table name to store the row record -tt    : table name for transaction, the table name to store transactions     比如,执行脚本:python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow,该函数负责处理各个选项的参数值情况,并存储。

2.2 create_tab

    创建两个表格,分别用来存储 binlog file文件的分析结果。一个用来存储事务的执行开始时间跟结束时间,由选项 -tt来赋值表名;一个是用来存储每一行记录的修改情况,由选项 -tr来赋值表名。     事务表记录内容:事务的开始时间及事务的结束时间。     行记录表的内容:库名,表名,DML类型以及事务对应事务表的编号。  
root@localhost:mysql3310.sock  14:42:29 [flashback]>show create table tbrow \G
*************************** 1. row ***************************
       Table: tbrow
Create Table: CREATE TABLE `tbrow` (
  `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `sqltype` int(11) NOT NULL COMMENT 1 is insert,2 is update,3 is delete,
  `tran_num` int(11) NOT NULL COMMENT the transaction number,
  `dbname` varchar(50) NOT NULL,
  `tbname` varchar(50) NOT NULL,
  PRIMARY KEY (`auto_id`),
  KEY `sqltype` (`sqltype`),
  KEY `dbname` (`dbname`),
  KEY `tbname` (`tbname`)
) ENGINE=InnoDB AUTO_INCREMENT=295151 DEFAULT CHARSET=utf8
1 row in set (0.00 sec)
 
root@localhost:mysql3310.sock  14:42:31 [flashback]>SHOW CREATE TABLE TBTRAN \G
*************************** 1. row ***************************
       Table: TBTRAN
Create Table: CREATE TABLE `tbtran` (
  `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `begin_time` datetime NOT NULL,
  `end_time` datetime NOT NULL,
  PRIMARY KEY (`auto_id`)
) ENGINE=InnoDB AUTO_INCREMENT=6390 DEFAULT CHARSET=utf8
1 row in set (0.00 sec)

 

2.3 rowrecord

    重点函数,分析binlog文件内容。这里有几个规律:
  1. 每个事务的结束点,是以 ‘Xid = ‘ 来查找
    1. 事务的开始时间,是事务内的第一个 ‘Table_map‘ 行里边的时间
    2. 事务的结束时间,是以 ‘Xid = ‘所在行的 里边的时间
  2. 每个行数据是属于哪个表格,是以 ‘Table_map‘来查找
  3. DML的类型是按照 行记录开头的情况是否为:‘### INSERT INTO‘  、‘### UPDATE‘ 、‘### DELETE FROM‘ 
  4. 注意,单个事务可以包含多个表格多种DML多行数据修改的情况。

2.4 binlogdesc

    描述分析结果,简单4个SQL分析。
  1. 分析修改行数据的 事务耗时情况
  2. 分析修改行数据的 事务影响行数情况
  3. 分析DML分布情况
  4. 分析 最多DML操作的表格 ,取前十个分析

2.5 closeconn

    关闭数据库连接。

3 使用说明

    首先,确保python安装了pymysql模块,把python脚本拷贝到文件 queryanalyse.py。     然后,把要分析的binlog文件先用 mysqlbinlog 指令分析存储,具体binlog的文件说明,可以查看之前的博文:关于binary log那些事——认真码了好长一篇。mysqlbinlog的指令使用方法,可以详细查看文档:https://dev.mysql.com/doc/refman/5.7/en/mysqlbinlog.html 。     比较常用通过指定开始时间跟结束时间来分析 binlog文件。 mysqlbinlog --start-datetime=‘2017-04-23 00:00:03‘ --stop-datetime=‘2017-04-23 00:30:00‘ --base64-output=decode-rows -v /data/mysql/logs/mysql-bin.007335 > /tmp/binlog_test.log           分析后,可以把这个 binlog_test.log文件拷贝到其他空闲服务器执行分析,只需要有个空闲的DB来存储分析记录即可。     假设这个时候,拷贝 binlog_test.log到测试服务器上,测试服务器上的数据库可以用来存储分析内容,则可以执行python脚本了,注意要进入到python脚本的目录中,或者指定python脚本路径。   python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f= /tmp/binlog_test.log -tt=flashback.tbtran -tr=flashback.tbrow       没了,就等待输出吧。     性能是硬伤,在虚拟机上测试,大概500M的binlog文件需要分析2-3min,有待提高!

4 python脚本

  1 import pymysql
  2 from pymysql.cursors import DictCursor
  3 import re
  4 import os
  5 import sys
  6 import datetime
  7 import time
  8 import logging
  9 import importlib
 10 importlib.reload(logging)
 11 logging.basicConfig(level=logging.DEBUG,format=%(asctime)s %(levelname)s %(message)s )
 12 
 13 
 14 usage=‘‘‘ usage: python [script‘s path] [option]
 15 ALL options need to assign:
 16 
 17 -h     : host, the database host,which database will store the results after analysis 
 18 -u     : user, the db user
 19 -p     : password, the db user‘s password
 20 -P     : port, the db port
 21 -f     : file path, the binlog file
 22 -tr    : table name for record , the table name to store the row record
 23 -tt    : table name for transaction, the table name to store transactions
 24 Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow
 25 
 26 ‘‘‘
 27 
 28 class queryanalyse:
 29     def __init__(self):
 30         #初始化
 31         self.host=‘‘
 32         self.user=‘‘
 33         self.password=‘‘
 34         self.port=3306
 35         self.fpath=‘‘
 36         self.tbrow=‘‘
 37         self.tbtran=‘‘
 38 
 39         self._get_db()
 40         logging.info(assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran))
 41 
 42         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset=utf8)
 43         self.cur = self.mysqlconn.cursor(cursor=DictCursor)
 44         logging.info(MySQL which userd to store binlog event connection is ok)
 45 
 46         self.begin_time=‘‘
 47         self.end_time=‘‘
 48         self.db_name=‘‘
 49         self.tb_name=‘‘
 50 
 51     def _get_db(self):
 52         #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式,
 53         #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理
 54         logging.info(begin to assign values to parameters)
 55         if len(sys.argv) == 1:
 56             print(usage)
 57             sys.exit(1)
 58         elif sys.argv[1] == --help:
 59             print(usage)
 60             sys.exit()
 61         elif len(sys.argv) > 2:
 62             for i in sys.argv[1:]:
 63                 _argv = i.split(=)
 64                 if _argv[0] == -h:
 65                     self.host = _argv[1]
 66                 elif _argv[0] == -u:
 67                     self.user = _argv[1]
 68                 elif _argv[0] == -P:
 69                     self.port = int(_argv[1])
 70                 elif _argv[0] == -f:
 71                     self.fpath = _argv[1]
 72                 elif _argv[0] == -tr:
 73                     self.tbrow = _argv[1]
 74                 elif _argv[0] == -tt:
 75                     self.tbtran = _argv[1]
 76                 elif _argv[0] == -p:
 77                     self.password = _argv[1]
 78                 else:
 79                     print(usage)
 80 
 81     def create_tab(self):
 82         #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况
 83         #注意,一个事务可以存储多行数据修改的情况
 84         logging.info(creating table ...)
 85         create_tb_sql =‘‘‘CREATE TABLE IF NOT EXISTS  {} (
 86                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
 87                           `begin_time` datetime NOT NULL,
 88                           `end_time` datetime NOT NULL,
 89                           PRIMARY KEY (`auto_id`)
 90                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 91                         CREATE TABLE IF NOT EXISTS  {} (
 92                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
 93                           `sqltype` int(11) NOT NULL COMMENT ‘1 is insert,2 is update,3 is delete‘,
 94                           `tran_num` int(11) NOT NULL COMMENT ‘the transaction number‘,
 95                           `dbname` varchar(50) NOT NULL,
 96                           `tbname` varchar(50) NOT NULL,
 97                           PRIMARY KEY (`auto_id`),
 98                           KEY `sqltype` (`sqltype`),
 99                           KEY `dbname` (`dbname`),
100                           KEY `tbname` (`tbname`)
101                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
102                         truncate table {};
103                         truncate table {};
104                         ‘‘‘.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow)
105 
106         self.cur.execute(create_tb_sql)
107         logging.info(created table {} and {}.format(self.tbrow,self.tbtran))
108 
109     def rowrecord(self):
110         #处理每一行binlog
111         #事务的结束采用 ‘Xid =‘ 来划分
112         #分析结果,按照一个事务为单位存储提交一次到db
113         try:
114             tran_num=1    #事务数
115             record_sql=‘‘ #行记录的insert sql
116             tran_sql=‘‘   #事务的insert sql
117 
118             self.create_tab()
119 
120             with open(self.fpath,r) as binlog_file:
121                 logging.info(begining to analyze the binlog file ,this may be take a long time !!!)
122                 logging.info(analyzing...)
123 
124                 for bline in binlog_file:
125 
126                     if bline.find(Table_map:) != -1:
127                         l = bline.index(server)
128                         n = bline.index(Table_map)
129                         begin_time = bline[:l:].rstrip( ).replace(#, 20)
130 
131                         if record_sql==‘‘:
132                             self.begin_time = begin_time[0:4] + - + begin_time[4:6] + - + begin_time[6:]
133 
134                         self.db_name = bline[n::].split( )[1].replace(`, ‘‘).split(.)[0]
135                         self.tb_name = bline[n::].split( )[1].replace(`, ‘‘).split(.)[1]
136                         bline=‘‘
137 
138                     elif bline.startswith(### INSERT INTO):
139                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},‘{}‘,‘{}‘);".format(self.tbrow,tran_num,self.db_name,self.tb_name)
140 
141                     elif bline.startswith(### UPDATE):
142                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},‘{}‘,‘{}‘);".format(self.tbrow,tran_num,self.db_name,self.tb_name)
143 
144                     elif bline.startswith(### DELETE FROM):
145                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},‘{}‘,‘{}‘);".format(self.tbrow,tran_num,self.db_name,self.tb_name)
146 
147                     elif bline.find(Xid =) != -1:
148 
149                         l = bline.index(server)
150                         end_time = bline[:l:].rstrip( ).replace(#, 20)
151                         self.end_time = end_time[0:4] + - + end_time[4:6] + - + end_time[6:]
152                         tran_sql=record_sql+"insert into {}(begin_time,end_time) VALUES (‘{}‘,‘{}‘)".format(self.tbtran,self.begin_time,self.end_time)
153 
154                         self.cur.execute(tran_sql)
155                         self.mysqlconn.commit()
156                         record_sql = ‘‘
157                         tran_num += 1
158 
159         except Exception:
160             return funtion rowrecord error
161 
162     def binlogdesc(self):
163         sql=‘‘
164         t_num=0
165         r_num=0
166         logging.info(Analysed result printing...\n)
167         #分析总的事务数跟行修改数量
168         sql="select ‘tbtran‘ name,count(*) nums from {}  union all select ‘tbrow‘ name,count(*) nums from {};".format(self.tbtran,self.tbrow)
169         self.cur.execute(sql)
170         rows=self.cur.fetchall()
171         for row in rows:
172             if row[name]==tbtran:
173                 t_num = row[nums]
174             else:
175                 r_num = row[nums]
176         print(This binlog file has {} transactions, {} rows are changed .format(t_num,r_num))
177 
178         # 计算 最耗时 的单个事务
179         # 分析每个事务的耗时情况,分为5个时间段来描述
180         # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second
181         sql=‘‘‘select 
182                       count(case when cost_sec between 0 and 1 then 1 end ) cos_1,
183                       count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5,
184                       count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10,
185                       count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30,
186                       count(case when cost_sec >30.1 then 1 end ) cos_more,
187                       max(cost_sec) cos_max
188                 from 
189                 (
190                         select 
191                             auto_id,timestampdiff(second,begin_time,end_time) cost_sec
192                         from {}
193                 ) a;‘‘‘.format(self.tbtran)
194         self.cur.execute(sql)
195         rows=self.cur.fetchall()
196 
197         for row in rows:
198             print(The most cost time : {} .format(row[cos_max]))
199             print(The distribution map of each transaction costed time: )
200             print(Cost time between    0 and  1 second : {} , {}%.format(row[cos_1],int(row[cos_1]*100/t_num)))
201             print(Cost time between  1.1 and  5 second : {} , {}%.format(row[cos_5], int(row[cos_5] * 100 / t_num)))
202             print(Cost time between  5.1 and 10 second : {} , {}%.format(row[cos_10], int(row[cos_10] * 100 / t_num)))
203             print(Cost time between 10.1 and 30 second : {} , {}%.format(row[cos_30], int(row[cos_30] * 100 / t_num)))
204             print(Cost time                     > 30.1 : {} , {}%\n.format(row[cos_more], int(row[cos_more] * 100 / t_num)))
205 
206         # 计算 单个事务影响行数最多 的行数量
207         # 分析每个事务 影响行数 情况,分为5个梯度来描述
208         sql=‘‘‘select 
209                     count(case when nums between 0 and 10 then 1 end ) row_1,
210                     count(case when nums between 11 and 100 then 1 end ) row_2,
211                     count(case when nums between 101 and 1000 then 1 end ) row_3,
212                     count(case when nums between 1001 and 10000 then 1 end ) row_4,
213                     count(case when nums >10001 then 1 end ) row_5,
214                     max(nums) row_max
215                from 
216                   (
217                     select 
218                              count(*) nums
219                     from {} group by tran_num
220                    ) a;‘‘‘.format(self.tbrow)
221         self.cur.execute(sql)
222         rows=self.cur.fetchall()
223 
224         for row in rows:
225             print(The most changed rows for each row: {} .format(row[row_max]))
226             print(The distribution map of each transaction changed rows : )
227             print(Changed rows between    1 and    10 second : {} , {}%.format(row[row_1],int(row[row_1]*100/t_num)))
228             print(Changed rows between   11 and   100 second : {} , {}%.format(row[row_2], int(row[row_2] * 100 / t_num)))
229             print(Changed rows between  101 and  1000 second : {} , {}%.format(row[row_3], int(row[row_3] * 100 / t_num)))
230             print(Changed rows between 1001 and 10000 second : {} , {}%.format(row[row_4], int(row[row_4] * 100 / t_num)))
231             print(Changed rows                       > 10001 : {} , {}%\n.format(row[row_5], int(row[row_5] * 100 / t_num)))
232 
233         # 分析 各个行数 DML的类型情况
234         # 描述 delete,insert,update的分布情况
235         sql=select sqltype ,count(*) nums from {} group by sqltype ;.format(self.tbrow)
236         self.cur.execute(sql)
237         rows=self.cur.fetchall()
238 
239         print(The distribution map of the {} changed rows : .format(r_num))
240         for row in rows:
241 
242             if row[sqltype]==1:
243                 print(INSERT rows :{} , {}% .format(row[nums],int(row[nums]*100/r_num)))
244             if row[sqltype]==2:
245                 print(UPDATE rows :{} , {}% .format(row[nums],int(row[nums]*100/r_num)))
246             if row[sqltype]==3:
247                 print(DELETE rows :{} , {}%\n .format(row[nums],int(row[nums]*100/r_num)))
248 
249         # 描述 影响行数 最多的表格
250         # 可以分析是哪些表格频繁操作,这里显示前10个table name
251         sql = ‘‘‘select 
252                       dbname,tbname ,
253                       count(*) ALL_rows,
254                       count(*)*100/{} per,
255                       count(case when sqltype=1 then 1 end) INSERT_rows,
256                       count(case when sqltype=2 then 1 end) UPDATE_rows,
257                       count(case when sqltype=3 then 1 end) DELETE_rows
258                 from {} 
259                 group by dbname,tbname 
260                 order by ALL_rows desc 
261                 limit 10;‘‘‘.format(r_num,self.tbrow)
262         self.cur.execute(sql)
263         rows = self.cur.fetchall()
264 
265         print(The distribution map of the {} changed rows : .format(r_num))
266         print(tablename.ljust(50),
267               |,changed_rows.center(15),
268               |,percent.center(10),
269               |,insert_rows.center(18),
270               |,update_rows.center(18),
271               |,delete_rows.center(18)
272               )
273         print(-------------------------------------------------------------------------------------------------------------------------------------------------)
274         for row in rows:
275             print((row[dbname]+.+row[tbname]).ljust(50),
276                   |,str(row[ALL_rows]).rjust(15),
277                   |,(str(int(row[per]))+%).rjust(10),
278                   |,str(row[INSERT_rows]).rjust(10)+ , +(str(int(row[INSERT_rows]*100/row[ALL_rows]))+%).ljust(5),
279                   |,str(row[
                        
                    

人气教程排行