当前位置:Gxlcms > 数据库问题 > 将MySQL去重操作优化到极致之三弹连发(二):多线程并行执行

将MySQL去重操作优化到极致之三弹连发(二):多线程并行执行

时间:2021-07-01 10:21:17 帮助过:11人阅读

1. 查询出4份数据的created_time边界值

  1. select date_add(‘2017-01-01‘,interval 125000 second) dt1,
  2. date_add(‘2017-01-01‘,interval 2*125000 second) dt2,
  3. date_add(‘2017-01-01‘,interval 3*125000 second) dt3,
  4. max(created_time) dt4
  5. from t_source;
        查询结果如图一所示。

技术分享

图一

2. 查看每份数据的记录数,确认数据平均分布
  1. select case when created_time >= ‘2017-01-01‘
  2. and created_time < ‘2017-01-02 10:43:20‘
  3. then ‘2017-01-01‘
  4. when created_time >= ‘2017-01-02 10:43:20‘
  5. and created_time < ‘2017-01-03 21:26:40‘
  6. then ‘2017-01-02 10:43:20‘
  7. when created_time >= ‘2017-01-03 21:26:40‘
  8. and created_time < ‘2017-01-05 08:10:00‘
  9. then ‘2017-01-03 21:26:40‘
  10. else ‘2017-01-05 08:10:00‘
  11. end min_dt,
  12. case when created_time >= ‘2017-01-01‘
  13. and created_time < ‘2017-01-02 10:43:20‘
  14. then ‘2017-01-02 10:43:20‘
  15. when created_time >= ‘2017-01-02 10:43:20‘
  16. and created_time < ‘2017-01-03 21:26:40‘
  17. then ‘2017-01-03 21:26:40‘
  18. when created_time >= ‘2017-01-03 21:26:40‘
  19. and created_time < ‘2017-01-05 08:10:00‘
  20. then ‘2017-01-05 08:10:00‘
  21. else ‘2017-01-06 18:53:20‘
  22. end max_dt,
  23. count(*)
  24. from t_source
  25. group by case when created_time >= ‘2017-01-01‘
  26. and created_time < ‘2017-01-02 10:43:20‘
  27. then ‘2017-01-01‘
  28. when created_time >= ‘2017-01-02 10:43:20‘
  29. and created_time < ‘2017-01-03 21:26:40‘
  30. then ‘2017-01-02 10:43:20‘
  31. when created_time >= ‘2017-01-03 21:26:40‘
  32. and created_time < ‘2017-01-05 08:10:00‘
  33. then ‘2017-01-03 21:26:40‘
  34. else ‘2017-01-05 08:10:00‘
  35. end,
  36. case when created_time >= ‘2017-01-01‘
  37. and created_time < ‘2017-01-02 10:43:20‘
  38. then ‘2017-01-02 10:43:20‘
  39. when created_time >= ‘2017-01-02 10:43:20‘
  40. and created_time < ‘2017-01-03 21:26:40‘
  41. then ‘2017-01-03 21:26:40‘
  42. when created_time >= ‘2017-01-03 21:26:40‘
  43. and created_time < ‘2017-01-05 08:10:00‘
  44. then ‘2017-01-05 08:10:00‘
  45. else ‘2017-01-06 18:53:20‘
  46. end;

        查询结果如图二所示。

技术分享

图二


        4份数据的并集应该覆盖整个源数据集,并且数据之间是不重复的。也就是说4份数据的created_time要连续且互斥,连续保证处理全部数据,互斥确保了不需要二次查重。实际上这和时间范围分区的概念类似,或许用分区表更好些,只是这里省略了重建表的步骤。

3. 建立查重的存储过程
        有了以上信息我们就可以写出4条语句处理全部数据。为了调用接口尽量简单,建立下面的存储过程。
  1. delimiter //
  2. create procedure sp_unique(i smallint)
  3. begin
  4. set @a:=‘0000-00-00 00:00:00‘;
  5. set @b:=‘ ‘;
  6. if (i<4) then
  7. insert into t_target
  8. select * from t_source force index (idx_sort)
  9. where created_time >= date_add(‘2017-01-01‘,interval (i-1)*125000 second)
  10. and created_time < date_add(‘2017-01-01‘,interval i*125000 second)
  11. and (@a!=created_time or @b!=item_name)
  12. and (@a:=created_time) is not null
  13. and (@b:=item_name) is not null
  14. order by created_time,item_name;
  15. commit;
  16. else
  17. insert into t_target
  18. select * from t_source force index (idx_sort)
  19. where created_time >= date_add(‘2017-01-01‘,interval (i-1)*125000 second)
  20. and created_time <= date_add(‘2017-01-01‘,interval i*125000 second)
  21. and (@a!=created_time or @b!=item_name)
  22. and (@a:=created_time) is not null
  23. and (@b:=item_name) is not null
  24. order by created_time,item_name;
  25. commit;
  26. end if;
  27. end
  28. //
  29. delimiter ;

        查询的执行计划都如图三所示。

技术分享

图三


        mysql优化器进行索引范围扫描,并且使用索引条件下推(ICP)优化查询。

二、并行执行
        下面分别使用shell后台进程和MySQL Schedule Event实现并行。

1. shell后台进程

(1)建立duplicate_removal.sh文件,内容如下。
  1. #!/bin/bash
  2. mysql -vvv -u root -p123456 test -e "truncate t_target" &>/dev/null
  3. date ‘+%H:%M.%N‘
  4. for y in {1..4}
  5. do
  6. sql="call sp_unique($y)"
  7. mysql -vvv -u root -p123456 test -e "$sql" &>par_sql1_$y.log &
  8. done
  9. wait
  10. date ‘+%H:%M.%N‘

(2)执行脚本文件

  1. chmod 755 duplicate_removal.sh
  2. ./duplicate_removal.sh

        执行输出入图四所示。

技术分享

图四

        这种方法用时3.4秒,并行执行的4个过程调用分别用时如图五所示。

技术分享

图五

        可以看到,每个过程的执行时间均不到3.4秒,因为是并行执行,总的过程执行时间也小于3.4秒,比单线程sql速度提高了近3倍。

2. MySQL Schedule Event
        吴老师也用到了并行,但他是利用MySQL自带的Schedule Event功能实现的,代码应该和下面的类似。

(1)建立事件历史日志表

  1. -- 用于查看事件执行时间等信息
  2. create table t_event_history (
  3. dbname varchar(128) not null default ‘‘,
  4. eventname varchar(128) not null default ‘‘,
  5. starttime datetime(3) not null default ‘0000-00-00 00:00:00‘,
  6. endtime datetime(3) default null,
  7. issuccess int(11) default null,
  8. duration int(11) default null,
  9. errormessage varchar(512) default null,
  10. randno int(11) default null
  11. );

(2)修改event_scheduler参数

  1. set global event_scheduler = 1;

(3)为每个并发线程创建一个事件

  1. delimiter //
  2. create event ev1 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
  3. begin
  4. declare r_code char(5) default ‘00000‘;
  5. declare r_msg text;
  6. declare v_error integer;
  7. declare v_starttime datetime default now(3);
  8. declare v_randno integer default floor(rand()*100001);
  9. insert into t_event_history (dbname,eventname,starttime,randno)
  10. #作业名
  11. values(database(),‘ev1‘, v_starttime,v_randno);
  12. begin
  13. #异常处理段
  14. declare continue handler for sqlexception
  15. begin
  16. set v_error = 1;
  17. get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;
  18. end;
  19. #此处为实际调用的用户程序过程
  20. call sp_unique(1);
  21. end;
  22. update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat(‘error=‘,r_code,‘, message=‘,r_msg),randno=null where starttime=v_starttime and randno=v_randno;
  23. end
  24. //
  25. create event ev2 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
  26. begin
  27. declare r_code char(5) default ‘00000‘;
  28. declare r_msg text;
  29. declare v_error integer;
  30. declare v_starttime datetime default now(3);
  31. declare v_randno integer default floor(rand()*100001);
  32. insert into t_event_history (dbname,eventname,starttime,randno)
  33. #作业名
  34. values(database(),‘ev2‘, v_starttime,v_randno);
  35. begin
  36. #异常处理段
  37. declare continue handler for sqlexception
  38. begin
  39. set v_error = 1;
  40. get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;
  41. end;
  42. #此处为实际调用的用户程序过程
  43. call sp_unique(2);
  44. end;
  45. update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat(‘error=‘,r_code,‘, message=‘,r_msg),randno=null where starttime=v_starttime and randno=v_randno;
  46. end
  47. //
  48. create event ev3 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
  49. begin
  50. declare r_code char(5) default ‘00000‘;
  51. declare r_msg text;
  52. declare v_error integer;
  53. declare v_starttime datetime default now(3);
  54. declare v_randno integer default floor(rand()*100001);
  55. insert into t_event_history (dbname,eventname,starttime,randno)
  56. #作业名
  57. values(database(),‘ev3‘, v_starttime,v_randno);
  58. begin
  59. #异常处理段
  60. declare continue handler for sqlexception
  61. begin
  62. set v_error = 1;
  63. get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;
  64. end;
  65. #此处为实际调用的用户程序过程
  66. call sp_unique(3);
  67. end;
  68. update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat(‘error=‘,r_code,‘, message=‘,r_msg),randno=null where starttime=v_starttime and randno=v_randno;
  69. end
  70. //
  71. create event ev4 on schedule at current_timestamp + interval 1 hour on completion preserve disable do
  72. begin
  73. declare r_code char(5) default ‘00000‘;
  74. declare r_msg text;
  75. declare v_error integer;
  76. declare v_starttime datetime default now(3);
  77. declare v_randno integer default floor(rand()*100001);
  78. insert into t_event_history (dbname,eventname,starttime,randno)
  79. #作业名
  80. values(database(),‘ev4‘, v_starttime,v_randno);
  81. begin
  82. #异常处理段
  83. declare continue handler for sqlexception
  84. begin
  85. set v_error = 1;
  86. get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;
  87. end;
  88. #此处为实际调用的用户程序过程
  89. call sp_unique(4);
  90. end;
  91. update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat(‘error=‘,r_code,‘, message=‘,r_msg),randno=null where starttime=v_starttime and randno=v_randno;
  92. end
  93. //
  94. delimiter ;

        说明:为了记录每个事件执行的时间,在事件定义中增加了操作日志表的逻辑,因为每个事件中只多执行了一条insert,一条update,4个事件总共多执行8条很简单的语句,对测试的影响可以忽略不计。执行时间精确到毫秒。

(4)触发事件执行

  1. mysql -vvv -u root -p123456 test -e "truncate t_target;alter event ev1 on schedule at current_timestamp enable;alter event ev2 on schedule at current_timestamp enable;alter event ev3 on schedule at current_timestamp enable;alter event ev4 on schedule at current_timestamp enable;"

        说明:该命令行顺序触发了4个事件,但不会等前一个执行完才执行下一个,而是立即向下执行。从图六的输出也可以清楚地看到这一点。因此四次过程调用是并行执行的。

技术分享图六
(5)查看事件执行日志
  1. select * from t_event_history;

        查询结果如图7所示。

技术分享

图七


        可以看到,每个过程的执行均为3.5秒,又因为是并行执行的,因此总的执行之间也是3.5秒,优化效果和shell后台进程方式几乎相同。

参考:
Increasing slow query performance with the parallel query execution
Mysql Event 调度历史记录


将MySQL去重操作优化到极致之三弹连发(二):多线程并行执行

标签:分布   执行   where   存储过程   concat   bin   lte   分区表   日志   

人气教程排行