coding: utf-8
import MySQLdb
import time
# common config
EXEC_DETAIL_FILE =
‘exec_detail.txt‘
DATETIME_FORMAT =
‘%Y-%m-%d %X‘
Default_MySQL_Host =
‘192.168.166.169‘
Default_MySQL_Port = 3358
Default_MySQL_User =
"mysql_admin"
Default_MySQL_Password =
‘mysql@Admin@Pwd‘
Default_MySQL_Charset =
"utf8"
Default_MySQL_Connect_TimeOut = 120
# Transfer Config
Transfer_Database_Name =
"db001"
Transfer_Source_Table_Name =
"tb2001"
Transfer_Target_Table_Name =
"tb2001_his"
Transfer_Condition =
"dt <‘2016-10-01‘"
Transfer_Rows_Per_Batch = 10000
Sleep_Second_Per_Batch = 0.5
def get_time_string(dt_time):
"""
获取指定格式的时间字符串
:param dt_time: 要转换成字符串的时间
:return: 返回指定格式的字符串
"""
global DATETIME_FORMAT
return time.strftime(DATETIME_FORMAT, dt_time)
def get_time_string(dt_time):
return time.strftime(
"%Y-%m-%d %X", dt_time)
def highlight(s):
return "%s[30;2m%s%s[1m" % (chr(27), s, chr(27
))
def print_warning_message(message):
"""
以红色字体显示消息内容
:param message: 消息内容
:return: 无返回值
"""
message =
str(message)
print(highlight(
‘‘) +
"%s[31;1m%s%s[0m" % (chr(27), message, chr(27
)))
def print_info_message(message):
"""
以绿色字体输出提醒性的消息
:param message: 消息内容
:return: 无返回值
"""
message =
str(message)
print(highlight(
‘‘) +
"%s[32;2m%s%s[0m" % (chr(27), message, chr(27
)))
def write_file(file_path, message):
"""
将传入的message追加写入到file_path指定的文件中
请先创建文件所在的目录
:param file_path: 要写入的文件路径
:param message: 要写入的信息
:return:
"""
file_handle = open(file_path,
‘a‘)
file_handle.writelines(message)
# 追加一个换行以方便浏览
file_handle.writelines(chr(13
))
file_handle.close()
def get_mysql_connection():
"""
根据默认配置返回数据库连接
:return: 数据库连接
"""
conn =
MySQLdb.connect(
host=
Default_MySQL_Host,
port=
Default_MySQL_Port,
user=
Default_MySQL_User,
passwd=
Default_MySQL_Password,
connect_timeout=
Default_MySQL_Connect_TimeOut,
charset=
Default_MySQL_Charset,
db=
Transfer_Database_Name
)
return conn
def mysql_exec(sql_script, sql_param=
None):
"""
执行传入的脚本,返回影响行数
:param sql_script:
:param sql_param:
:return: 脚本最后一条语句执行影响行数
"""
try:
conn =
get_mysql_connection()
print_info_message("在服务器{0}上执行脚本:{1}".format(
conn.get_host_info(), sql_script))
cursor =
conn.cursor()
if sql_param
is not None:
cursor.execute(sql_script, sql_param)
else:
cursor.execute(sql_script)
affect_rows =
cursor.rowcount
conn.commit()
cursor.close()
conn.close()
return affect_rows
except Exception as ex:
cursor.close()
conn.rollback()
raise Exception(str(ex))
def mysql_exec_many(sql_script_list):
"""
执行传入的脚本,返回影响行数
:param sql_script_list: 要执行的脚本List,List中每个元素为sql_script, sql_param对
:return: 返回执行每个脚本影响的行数列表
"""
try:
conn =
get_mysql_connection()
exec_result_list =
[]
for sql_script, sql_param
in sql_script_list:
print_info_message("在服务器{0}上执行脚本:{1}".format(
conn.get_host_info(), sql_script))
cursor =
conn.cursor()
if sql_param
is not None:
cursor.execute(sql_script, sql_param)
else:
cursor.execute(sql_script)
affect_rows =
cursor.rowcount
exec_result_list.append("影响行数:{0}".format(affect_rows))
conn.commit()
cursor.close()
conn.close()
return exec_result_list
except Exception as ex:
cursor.close()
conn.rollback()
raise Exception(str(ex))
def mysql_query(sql_script, sql_param=
None):
"""
执行传入的SQL脚本,并返回查询结果
:param sql_script:
:param sql_param:
:return: 返回SQL查询结果
"""
try:
conn =
get_mysql_connection()
print_info_message("在服务器{0}上执行脚本:{1}".format(
conn.get_host_info(), sql_script))
cursor =
conn.cursor()
if sql_param !=
‘‘:
cursor.execute(sql_script, sql_param)
else:
cursor.execute(sql_script)
exec_result =
cursor.fetchall()
cursor.close()
conn.close()
return exec_result
except Exception as ex:
cursor.close()
conn.close()
raise Exception(str(ex))
def get_column_info_list(table_name):
sql_script =
"""
DESC {0}
""".format(table_name)
column_info_list =
[]
query_result = mysql_query(sql_script=sql_script, sql_param=
None)
for row
in query_result:
column_name =
row[0]
column_key = row[3
]
column_info =
column_name, column_key
column_info_list.append(column_info)
return column_info_list
def get_id_range():
"""
按照传入的表获取要删除数据最大ID、最小ID、删除总行数
:return: 返回要删除数据最大ID、最小ID、删除总行数
"""
global Transfer_Condition
global Transfer_Rows_Per_Batch
sql_script =
"""
SELECT
MAX(ID) AS MAX_ID,
MIN(ID) AS MIN_ID,
COUNT(1) AS Total_Count
FROM {0}
WHERE {1};
""".format(Transfer_Source_Table_Name, Transfer_Condition)
query_result = mysql_query(sql_script=sql_script, sql_param=
None)
max_id, min_id, total_count =
query_result[0]
# 此处有一坑,可能出现total_count不为0 但是max_id 和min_id 为None的情况
# 因此判断max_id和min_id 是否为NULL
if (max_id
is None)
or (min_id
is None):
max_id, min_id, total_count =
0, 0, 0
return max_id, min_id, total_count
def check_env():
try:
source_columns_info_list =
get_column_info_list(Transfer_Source_Table_Name)
target_columns_info_list =
get_column_info_list(Transfer_Target_Table_Name)
if len(source_columns_info_list) !=
len(target_columns_info_list):
print_info("源表和目标表的列数不对,不满足迁移条件")
return False
column_count =
len(source_columns_info_list)
id_flag =
False
for column_id
in range(column_count):
source_column_name, source_column_key =
source_columns_info_list[column_id]
target_column_name, target_column_key =
target_columns_info_list[column_id]
if source_column_name !=
target_column_name:
print_info("源表和目标表的列名不匹配,不满足迁移条件")
return False
if source_column_name.lower() ==
‘id‘ and source_column_key.lower() ==
‘pri‘ and target_column_name.lower() ==
‘id‘ and target_column_key.lower() ==
‘pri‘:
id_flag =
True
if not id_flag:
print_info("未找到为主键的id列,不满足迁移条件")
return False
return True
except Exception as ex:
print_info("执行出现异常,异常为{0}".format(ex.message))
return False
def main():
flag =
check_env()
if not flag:
return
loop_trans_data()
def trans_data(current_min_id, current_max_id):
global Transfer_Source_Table_Name
global Transfer_Target_Table_Name
global Transfer_Condition
global Transfer_Rows_Per_Batch
print_info_message("*" * 70
)
copy_data_script =
"""
INSERT INTO {0}
SELECT * FROM {1}
WHERE ID>={2}
AND ID<{3}
AND {4} ;
""".format(Transfer_Target_Table_Name, Transfer_Source_Table_Name, current_min_id, current_max_id, Transfer_Condition)
delete_data_script =
"""
DELETE FROM {0}
WHERE ID IN (
SELECT ID
FROM {1}
WHERE ID>={2}
AND ID<{3})
AND ID>={4}
AND ID<{5};
""".format(Transfer_Source_Table_Name, Transfer_Target_Table_Name, current_min_id, current_max_id, current_min_id,
current_max_id)
sql_script_list =
[]
tem_sql_script =
copy_data_script, None
sql_script_list.append(tem_sql_script)
tem_sql_script =
delete_data_script, None
sql_script_list.append(tem_sql_script)
exec_result_list =
mysql_exec_many(sql_script_list)
print_info_message("执行结果:")
for item
in exec_result_list:
print_info_message(item)
def loop_trans_data():
max_id, min_id, total_count =
get_id_range()
if min_id ==
max_id:
print_info_message("无数据需要结转")
return
current_min_id =
min_id
global Transfer_Rows_Per_Batch
while current_min_id <=
max_id:
current_max_id = current_min_id +
Transfer_Rows_Per_Batch
trans_data(current_min_id, current_max_id)
current_percent = (current_max_id - min_id) * 100.0 / (max_id -
min_id)
left_rows = max_id -
current_max_id
if left_rows <
0:
left_rows =
0
current_percent_str =
"%.2f" %
current_percent
info =
"当前复制进度{0}/{1},剩余{2},进度为{3}%".format(current_max_id,
max_id, left_rows,
current_percent_str)
print_info_message(info)
time.sleep(Sleep_Second_Per_Batch)
current_min_id =
current_max_id
print_info_message("*" * 70
)
print_info_message("执行完成")
if __name__ ==
‘__main__‘:
main()
View Code
按照各位场景的,需要修改数据库连接信息:
还有需要迁移表的信息:
生成测试数据的mysql脚本:
CREATE TABLE `tb2001` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`c1` varchar(200) DEFAULT NULL,
`dt` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
create table tb2001_his like tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM mysql.user;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
View Code
最终运行结果如下:
显示简单粗暴,有兴趣的可以在此基础上修改!
=================================================================
python--同一mysql数据库下批量迁移数据
标签: