当前位置:Gxlcms > 数据库问题 > Python开发MySQL数据库(表)克隆工具

Python开发MySQL数据库(表)克隆工具

时间:2021-07-01 10:21:17 帮助过:21人阅读

前段时间因为某些需求,需要频繁的克隆MySQL的某些数据库或者某几个表。手动导出导入操作了几次,感觉甚是繁琐,而且效率不高,出错几率过大。索性抽时间用Python开发了一个MySQL的clone工具,用了一段时间,将数据库或者表克隆到同一台服务器的话(即源数据库与目标数据库在同一主机上),百万条数据也就是几十秒的时间搞定。该工具也支持将本地数据库或者表克隆到远程主机上。


程序比较简单,就一个Python文件,原理就是主要使用了MySQL的LOAD DATA INFILE命令。先来看下工具帮助信息:


[root@iZ2876x9bezZ ~]# python mysqlclone.py --help
usage: mysqlclone.py [-h] [--sourceHost SOURCEHOST] [--sourcePort SOURCEPORT]
                     [--sourcePasswd SOURCEPASSWD] [--sourceUser SOURCEUSER]
                     [--sourceDb SOURCEDB] [--dstDb DSTDB]
                     [--sourceTable SOURCETABLE] [--dstHost DSTHOST]
                     [--dstPort DSTPORT] [--dstPasswd DSTPASSWD]
                     [--dstUser DSTUSER] [--no-data] [--lock-all-tables]
                     [--events] [--routines] [--triggers]

optional arguments:
  -h, --help            show this help message and exit
  --sourceHost SOURCEHOST
                        The source database host,default[localhost]
  --sourcePort SOURCEPORT
                        The source database port,defalut[3306]
  --sourcePasswd SOURCEPASSWD
                        The source databas passwd,default[NULL]
  --sourceUser SOURCEUSER
                        The source databas username,default[root]
  --sourceDb SOURCEDB   The source databas name
  --dstDb DSTDB         The dst databas name
  --sourceTable SOURCETABLE
                        The source table,default[None]
  --dstHost DSTHOST     The dst database host,default[localhost]
  --dstPort DSTPORT     The dst database port,defalut[3306]
  --dstPasswd DSTPASSWD
                        The dst databas passwd,default[NULL]
  --dstUser DSTUSER     The dst databas username,default[root]
  --no-data, -d         No row information;False is default
  --lock-all-tables, -X
                        Locks all tables,default[False:Lock the table to be
                        read]
  --events, -E          Clone events,default[False]
  --routines, -R        Clone stored routines (functions and
                        procedures),default[False]
  --triggers            Clone triggers for each dumped
                        table,default[False]

简单介绍下几个选项参数:

--sourceHost   指定源数据库主机的IP或者主机名
--sourcePasswd 源数据库主机的密码
--sourceUser 源数据库主机的用户名
--sourceDb   指定要克隆的源数据库
--sourceTable 指定要克隆的源数据表,该值可以不指定,若不指定的话则为克隆上边指定数据库里的所有表
--dstHost 目标数据库主机IP或者主机名
--dstPasswd 目标数据库密码
--dstUser  目标数据库用户名
--no-data|-d 指定该参数时,只克隆表结构信息,不克隆数据。
--lock-all-tables|-X 指定该参数时,表示将整个数据库的所有表都加上读所。默认情况下只是给单个表加读所。
--events|-E 克隆events
--routines|-R 克隆function和procedure
--triggers 克隆triggers


示例,将test数据库整个clone到test3库中,test3库为提前创建好的一个空库。

[root@iZ2876x9bezZ ~]# python mysqlclone.py --sourceHost 127.0.0.1 --sourceUser root --sourcePasswd xxx --dstHost 127.0.0.1 --dstUser root --dstPasswd xxx --sourceDb test --dstDb test3 --no-data -E -R -X

  [2015-07-10 11:44:44] INFO: - Start clone database [127.0.0.1:test] To [127.0.0.1:test3]
[2015-07-10 11:44:44] INFO: - table [pcore_information_information] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_information_reply] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_item_item] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_consignee_address] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_member] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_region] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_region_district] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_region_district_exist] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_session] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_usergroup] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_adminsession] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_member] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_memberfield] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_seccode] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_session] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_usergroup] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_data] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_field] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_file] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_global] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_module] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_schedule] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_trade_cart] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_trade_order] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_trade_order_detail] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_trade_order_return_goods] was transferred.


脚本代码:

#!/bin/env python
#coding=utf-8

import MySQLdb,datetime,sys,time,os,argparse
from abc import ABCMeta, abstractmethod
__author__ = "liuzhenwei"
__version__ = "1.2"

class Clone():
	__metaclass__ = ABCMeta

	def __init__(self,**kwargs):
		self.pid = os.getpid()
		self.sourceCur = kwargs[‘sourceCur‘]
		self.dstCur =  kwargs[‘dstCur‘]
		self.noData =  kwargs[‘noData‘]
		self.sourceTable =  kwargs[‘sourceTable‘]
		self.lockAllTables =  kwargs[‘lockAllTables‘]
		self.sourceDb = kwargs[‘sourceDb‘]
		#print kwargs
	@abstractmethod
	def clone(self):
		pass

class MySQLClone(object):

	def __init__(self,**kwargs):
		self.sourceTable = kwargs[‘sourceTable‘]
		self.noData = kwargs[‘noData‘]
		self.lockAllTables = kwargs[‘lockAllTables‘]
		self.triggers = kwargs[‘triggers‘]
		self.routines = kwargs[‘routines‘]
		self.events = kwargs[‘events‘]
		self.sourceDb = kwargs[‘sourceDb‘]
		try:
			self.sourceConn=MySQLdb.connect(host=kwargs[‘sourceHost‘],user=kwargs[‘sourceUser‘],passwd=kwargs[‘sourcePasswd‘],port=kwargs[‘sourcePort‘])
			self.sourceCur=self.sourceConn.cursor()
			self.sourceConn.select_db(kwargs[‘sourceDb‘])
			self.sourceCur.execute("SET NAMES utf8;")
			#self.sourceCur.execute("SET AUTOCOMMIT=0;")
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

		try:
			self.dstConn=MySQLdb.connect(host=kwargs[‘dstHost‘],user=kwargs[‘dstUser‘],passwd=kwargs[‘dstPasswd‘],port=kwargs[‘dstPort‘],local_infile=1)
			self.dstCur=self.dstConn.cursor()
			self.dstConn.select_db(kwargs[‘dstDb‘])
			self.dstCur.execute("SET NAMES utf8;")
			self.dstCur.execute("SET AUTOCOMMIT=0;")
			#stop bin log
			self.dstCur.execute("SET sql_log_bin=0;")
			#
			#self.dstCur.execute("SET GLOBAL innodb_flush_log_at_trx_commit=2;")
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

	def __dstCommit(self):
	 	 self.dstConn.commit()
    
	def __sourceColse(self):
		self.sourceCur.close()
		self.sourceConn.close()

	def __dstColse(self):
		self.dstCur.close()
		self.dstConn.close()

	def clone(self):
		kwargs = {}
		kwargs[‘sourceCur‘] = self.sourceCur
		kwargs[‘dstCur‘] = self.dstCur
		kwargs[‘noData‘] = self.noData
		kwargs[‘sourceTable‘] = self.sourceTable
		kwargs[‘lockAllTables‘] = self.lockAllTables
		kwargs[‘sourceDb‘] = self.sourceDb
		c = DatabaseClone(**kwargs)
		c.clone()

		if self.triggers:
			t = TriggersClone(**kwargs)
			t.clone()

		if self.routines:
			#print kwargs
			r = RoutinesClone(**kwargs)
			r.clone()

		if self.events:
			e = EventsClone(**kwargs)
			e.clone()

	def __del__(self):
		self.__dstCommit()
		self.__sourceColse()
		self.__dstColse()


class DatabaseClone(Clone):
	‘‘‘
	clone database
	‘‘‘
	def __init__(self,**kwargs):
		Clone.__init__(self,**kwargs)
	def __cloneSingleTable(self,table):
		#create table
		count = self.sourceCur.execute(‘SHOW CREATE TABLE %s;‘ % table)
		results=self.sourceCur.fetchone()
		try:
			self.dstCur.execute(results[1])
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			return False
		else:
			if self.noData:
				PrintInfo.say(‘table‘,table)
		#load data
		if not self.noData:
			tmpFile = ‘/tmp/‘+table+‘_‘+str(self.pid)
			try:
				outFileSQL = "SELECT * INTO OUTFILE ‘%s‘ FIELDS TERMINATED BY ‘,‘ OPTIONALLY ENCLOSED BY ‘\"‘ LINES TERMINATED BY ‘\n‘ FROM %s;" % (tmpFile,table)
				count = self.sourceCur.execute(outFileSQL)
			except MySQLdb.Error,e:
				print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			else:
				self.__loadData(tmpFile,table)
		if not self.lockAllTables:
			self.sourceCur.execute("COMMIT;")
			self.sourceCur.execute("UNLOCK TABLES;")
	
	def __loadData(self,dataFile,table):
		try:
			intoFileSQL = "LOAD DATA LOCAL INFILE ‘%s‘ INTO TABLE %s CHARACTER SET utf8 FIELDS TERMINATED BY ‘,‘ OPTIONALLY ENCLOSED BY ‘\"‘ LINES TERMINATED BY ‘\n‘;" % (dataFile,table)
			self.dstCur.execute(intoFileSQL)
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
		else:
			os.remove(dataFile)
			PrintInfo.say(‘table‘,table)

	def clone(self):
		tableList = []
		if self.sourceTable:
			tableList.append(self.sourceTable)
		else:
			count = self.sourceCur.execute(‘SHOW TABLES;‘)
			results=self.sourceCur.fetchall()
			for table in results:
				tableList.append(table[0])
			#lock all tables
			if self.lockAllTables:
				lockSQL = "LOCK TABLES %s;" % ‘,‘.join([t+‘ READ‘ for t in tableList])
				self.sourceCur.execute(lockSQL)

		for t in tableList:
			#lock single table
			if not self.lockAllTables:
				self.sourceCur.execute("LOCK TABLES %s READ;"% t)
			self.__cloneSingleTable(t)
		
		if self.lockAllTables:
			self.sourceCur.execute("COMMIT;")
			self.sourceCur.execute("UNLOCK TABLES;")


class EventsClone(Clone):
	‘‘‘
	clone events
	‘‘‘
	def __init__(self,**kwargs):
		Clone.__init__(self,**kwargs)
	def clone(self):
		for event in self.__getEventList():
			self.__cloneEvent(event)

	def __getEventList(self):
		try:
			count = self.sourceCur.execute("SHOW EVENTS;")
			results=self.sourceCur.fetchall()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
		else:
			for event in results:
				yield event[1]

	def __cloneEvent(self,event):
		try:
			count = self.sourceCur.execute("SHOW CREATE EVENT %s;" % event)
			results=self.sourceCur.fetchone()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

		else:
			try:
				self.dstCur.execute(results[3])
			except MySQLdb.Error,e:
				print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			else:
				PrintInfo.say(‘event‘,event)

class TriggersClone(Clone):
	‘‘‘
	clone triggers
	‘‘‘
	def __init__(self,**kwargs):
		pass
	def clone(self):
		print "clone triggers"

class RoutinesClone(Clone):
	‘‘‘
	clone routines (functions and procedures).
	‘‘‘
	def __init__(self,**kwargs):
		Clone.__init__(self,**kwargs)

	def clone(self):
		for proc in self.__getProcList():
			self.__cloneProc(proc)

		for func in self.__getFunctionList():
			self.__cloneFunc(func)

	def __getProcList(self):
		try:
			count = self.sourceCur.execute("SHOW PROCEDURE STATUS WHERE db=‘%s‘;" % (self.sourceDb))
			results=self.sourceCur.fetchall()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
		else:
			for proc in results:
				yield proc[1]

	def __cloneProc(self,proc):
		try:
			count = self.sourceCur.execute("SHOW CREATE PROCEDURE %s;" % proc)
			results=self.sourceCur.fetchone()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

		else:
			try:
				self.dstCur.execute(results[2])
			except MySQLdb.Error,e:
				print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			else:
				PrintInfo.say(‘procedure‘,proc)
	def __getFunctionList(self):
		try:
			count = self.sourceCur.execute("SHOW FUNCTION STATUS WHERE db=‘%s‘;" % (self.sourceDb))
			results=self.sourceCur.fetchall()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
		else:
			for func in results:
				yield func[1]

	def __cloneFunc(self,func):
		try:
			count = self.sourceCur.execute("SHOW CREATE FUNCTION %s;" % func)
			results=self.sourceCur.fetchone()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

		else:
			try:
				self.dstCur.execute(results[2])
			except MySQLdb.Error,e:
				print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			else:
				PrintInfo.say(‘function‘,func)

class PrintInfo(object):
	‘‘‘
	print info
	‘‘‘
	@staticmethod
	def say(item,name):
		‘‘‘
		print info 
		‘‘‘
		curTime = time.strftime(‘%Y-%m-%d %H:%M:%S‘,time.localtime(time.time()))
		print >> sys.stdout,"[%s] INFO: - %s [%s] was transferred." % (curTime,item,name)



if __name__ == "__main__":

	parser = argparse.ArgumentParser()
	parser.add_argument(‘--sourceHost‘,action=‘store‘,dest=‘sourceHost‘,default=‘localhost‘,help=‘The source database host,default[localhost]‘)
	parser.add_argument(‘--sourcePort‘,action=‘store‘,dest=‘sourcePort‘,default=3306,help=‘The source database port,defalut[3306]‘)
	parser.add_argument(‘--sourcePasswd‘,action=‘store‘,dest=‘sourcePasswd‘,default="",help=‘The source databas passwd,default[NULL]‘)
	parser.add_argument(‘--sourceUser‘,action=‘store‘,dest=‘sourceUser‘,default=‘root‘,help=‘The source databas username,default[root]‘)
	parser.add_argument(‘--sourceDb‘,action=‘store‘,dest=‘sourceDb‘,default=None,help=‘The source databas name‘)
	parser.add_argument(‘--dstDb‘,action=‘store‘,dest=‘dstDb‘,default=None,help=‘The dst databas name‘)
	parser.add_argument(‘--sourceTable‘,action=‘store‘,dest=‘sourceTable‘,default=None,help=‘The source table,default[None]‘)
	parser.add_argument(‘--dstHost‘,action=‘store‘,dest=‘dstHost‘,default=‘localhost‘,help=‘The dst database host,default[localhost]‘)
	parser.add_argument(‘--dstPort‘,action=‘store‘,dest=‘dstPort‘,default=3306,help=‘The dst database port,defalut[3306]‘)
	parser.add_argument(‘--dstPasswd‘,action=‘store‘,dest=‘dstPasswd‘,default="",help=‘The dst databas passwd,default[NULL]‘)
	parser.add_argument(‘--dstUser‘,action=‘store‘,dest=‘dstUser‘,default=‘root‘,help=‘The dst databas username,default[root]‘)

	parser.add_argument(‘--no-data‘,‘-d‘,action=‘store_true‘,dest=‘noData‘,help=‘No row information;False is default‘)
	
	parser.add_argument(‘--lock-all-tables‘,‘-X‘,action=‘store_true‘,dest=‘lockAllTables‘,help=‘Locks all tables,default[False:Lock the table to be read]‘)

	parser.add_argument(‘--events‘,‘-E‘,action=‘store_true‘,dest=‘events‘,help=‘Clone events,default[False]‘)
	parser.add_argument(‘--routines‘,‘-R‘,action=‘store_true‘,dest=‘routines‘,help=‘Clone stored routines (functions and procedures),default[False]‘)
	parser.add_argument(‘--triggers‘,action=‘store_true‘,dest=‘triggers‘,help=‘Clone triggers for each dumped table,default[False]‘)

	args = parser.parse_args()

	if args.sourceDb is None:
		print >> sys.stderr,"ERROR: parameter --sourceDb cant not be NULL"
		sys.exit(1)

	if args.dstDb is None:
		print >> sys.stderr,"ERROR: parameter --dstDb cant not be NULL"
		sys.exit(1)

	conInfo = {
		"sourceHost":args.sourceHost,
		"sourceUser":args.sourceUser,
		"sourcePasswd":args.sourcePasswd,
		"sourceDb":args.sourceDb,
		"sourceTable":args.sourceTable,
		"noData":args.noData,
		"dstHost":args.dstHost,
		"dstUser":args.dstUser,
		"dstPasswd":args.dstPasswd,
		"dstDb":args.dstDb,
		"sourcePort":args.sourcePort,
		"dstPort":args.dstPort,
		"lockAllTables":args.lockAllTables,
		"events":args.events,
		"routines":args.routines,
		"triggers":args.triggers
		}
	print >> sys.stdout,"[%s] INFO: - Start clone database [%s:%s] To [%s:%s]" %(time.strftime(‘%Y-%m-%d %H:%M:%S‘,time.localtime(time.time())),conInfo[‘sourceHost‘],conInfo[‘sourceDb‘],conInfo[‘dstHost‘],conInfo[‘dstDb‘])
	
	m = MySQLClone(**conInfo)
	m.clone()

由于时间问题,克隆triggers的功能还没有完成,而且当数据量过大的时候,最好是将数据分批操作,如每10万load一次,这样会得到更好的性能,该程序会持续更新下。

也可以直接从github下载程序源码:https://github.com/diannaowa/mysqlclone

本文出自 “diannaowa” 博客,请务必保留此出处http://diannaowa.blog.51cto.com/3219919/1677651

Python开发MySQL数据库(表)克隆工具

标签:mysql   克隆   python   mysql克隆   

人气教程排行