#!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import paramiko,os,sys,datetime,time,MySQLdb
5
6 def mysql_conn(sql):
7 try:
8 conn = MySQLdb.connect(host =
‘‘,user =
‘‘,passwd =
‘‘,connect_timeout=5
)
9 cursor = conn.cursor(cursorclass =
MySQLdb.cursors.DictCursor)
10 cursor.execute(sql)
11 alldata =
cursor.fetchall()
12 cursor.close()
13 conn.close()
14 if len(alldata)==
0:
15 return 0
16 return alldata[0]
17 except MySQLdb.Error,e:
18 return 0
19
20 def ssh_connect(ip,password,cmd):
21 port=63008
22 user=
"root"
23
24 ssh=
paramiko.SSHClient()
25 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
26 ssh.connect(ip,port,user,password)
27 stdin,stdout,stderr=
ssh.exec_command(cmd)
28 data=
stdout.read().strip()
29 ssh.close()
30 return data
31
32
33 class Database:
34 def __init__(self,dbport,slave_ip,stat_ip,master_ip):
35 self.user=
‘root‘
36 self.password=
‘‘
37 self.port=63008
38 self.today=datetime.date.today().strftime(
‘%Y%m%d‘)
39 self.dbport=
dbport
40 self.slave_ip=
slave_ip
41 self.host=
master_ip
42 self.main_id=
0
43 self.dist_id=
0
44 self.data_dir=
‘‘
45 self.version=
‘‘
46 self.m_version=
‘‘
47 self.app_name=
‘‘
48 self.m_app_name=
‘‘
49 self.conf_name=
‘‘
50 self.m_conf_name=
‘‘
51 self.STAT_IP =
stat_ip
52 self.m_dbport=
0
53
54 def log_w(self,text):
55 logfile =
"/home/create_slave_%s.log" %
self.dbport
56 now = time.strftime(
"%Y-%m-%d %H:%M:%S")
57 tt = str(now) +
"\t" + str(text) +
"\n"
58 f = open(logfile,
‘a+‘)
59 f.write(tt)
60 f.close()
61
62 def find_ip(self):
#走中心应用确认本服务器是否用于从库
63 text =
"正在从中心应用确认本服务器是否用于从库,稍等......".decode(
"utf-8").encode(
"GBK")
64 self.log_w(text)
65 print "\033[1;32;40m%s\033[0m" %
text
66 sql =
"SELECT c.main_id,c.data_dir,c.version,c.dist_id,c.ip,c.app_name FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id AND c.flag=1 AND c.del_info!=0 AND c.ip=‘%s‘ AND c.port=%s" %
(self.slave_ip,self.dbport)
67 alldata =
mysql_conn(sql)
68 if alldata==
0:
69 text =
" This server is not matched server,server Error,please check it !!"
70 self.log_w(text)
71 print "\033[1;31;40m%s\033[0m" %
text
72 sys.exit()
73 if alldata[
"app_name"].find(
"_s") != -1
:
74 text =
" This server is slave server,server OK !!"
75 self.log_w(text)
76 print "\033[1;32;40m%s\033[0m" %
text
77 self.dist_id=alldata[
"dist_id"]
78 self.main_id=alldata[
‘main_id‘]
79 self.version=alldata[
‘version‘]
80 self.app_name=alldata[
‘app_name‘]
81 self.m_app_name=self.app_name.split(
‘_‘)[0]
82 self.data_dir=alldata[
‘data_dir‘]
83 self.conf_name=
‘%s-%s-%s-%s‘ %
(self.main_id,self.dist_id,self.m_app_name,self.dbport)
84 print "STAT_IP: %s" %
(self.STAT_IP)
85 sql =
"SELECT c.version FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id AND c.main_id=%s AND c.flag=‘1‘ AND c.del_info!=0 AND c.dist_id=%s AND c.app_name=‘%s‘" %
(self.main_id,self.dist_id,self.m_app_name)
86 self.m_version=mysql_conn(sql)[
"version"]
87
88 else:
89 text =
" This server is not used slave server,server Error,please check it !!"
90 self.log_w(text)
91 print "\033[1;31;40m%s\033[0m" %
text
92 sys.exit()
93
94 def check_mysql(self):
#检查从库mysql数据库服务是否运行,如在运行则pkill掉,然后跳过权限表启动,为导入数据做准备
95 text =
"Check mysql now,Please wait...."
96 self.log_w(text)
97 print "\033[1;32;40m%s\033[0m" %
text
98 ret=os.popen(
"cd /root/ && rm -f /root/deploy_mysql_instance_gao.sh && wget http://208.asktao.com/multi/deploy_mysql_instance_gao.sh && echo $?").read().strip()
99 if ret !=
‘0‘:
100 text =
" mysql deploy script download fail !"
101 self.log_w(text)
102 print "\033[1;31;40m%s\033[0m" %
text
103 sys.exit()
104 ret=os.popen(
"sh /root/deploy_mysql_instance_gao.sh %s %s %s %s %s %s %s" %
(self.main_id,self.dist_id,self.app_name,self.dbport,self.version,self.data_dir,self.STAT_IP)).read().strip()
105 os.popen(
"rm -f /root/deploy_mysql_instance_gao.sh")
106 if ret.find(
‘Deployment failure‘) != -1
:
107 text =
" mysql instance exists,please check !"
108 self.log_w(text)
109 print "\033[1;31;40m%s\033[0m" %
text
110 sys.exit()
111
112 if not os.path.isdir(
"/usr/local/mysql%s" %
self.version):
113 text =
" Mysql not install,Please install mysql !"
114 self.log_w(text)
115 print "\033[1;31;40m%s\033[0m" %
text
116 sys.exit()
117 if os.popen(
"netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() !=
‘0‘:
118 os.popen(
"/usr/local/mysql%s/bin/mysqladmin --socket=/tmp/mysql-%s.sock shutdown"%
(self.version,self.dbport))
119 time.sleep(10
)
120 while 1
:
121 if os.popen(
"netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() ==
‘0‘:
122 conm =
"/usr/local/mysql%s/bin/mysqld_safe --defaults-file=/home/mysql/etc/%s.cnf --user=mysql --skip-grant-tables &"%
(self.version,self.conf_name)
123 os.popen(conm)
124 break
125 else:
126 text =
"Mysql not stop,please wait..."
127 self.log_w(text)
128 print text
129 time.sleep(5
)
130 a =
0
131 while 1
:
132 if os.popen(
"netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() !=
‘0‘:
133 text =
"Mysql restart success !!"
134 self.log_w(text)
135 print "\033[1;32;40m%s\033[0m" %
text
136 break
137 else:
138 if a == 12
:
139 text =
"Mysql not Running,please start with ‘--skip-grant-tables‘ !"
140 self.log_w(text)
141 print "\033[1;31;40m%s\033[0m" %
text
142 sys.exit()
143 else:
144 a= a + 1
145 time.sleep(5
)
146
147 def export_table(self):
#导出当前主库的表结构
148 text =
"Export master table and back mysql,Please wait ...."
149 self.log_w(text)
150 print "\033[1;32;40m%s\033[0m" %
text
151 try:
152 s=
paramiko.SSHClient()
153 s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
154 s.connect(self.host,self.port,self.user,self.password)
155 conm =
"/usr/local/mysql%s/bin/mysqldump --socket=/tmp/mysql-%s.sock --add-drop-table -udump -pKPt1IVNd4BnswQpc -d -A|bzip2 -2 > /data1/script/db_back/%s_%s_table_%s_%s.bz2 && echo $?" % (self.m_version,self.m_dbport,self.main_id,self.app_name.split(
‘_‘)[0],self.dist_id,self.today)
156 stdin,stdout,stderr=
s.exec_command(conm)
157 result = stdout.readlines()[-1
].strip()
158 s.close()
159 if result ==
‘0‘:
160 text =
" Export_table success !"
161 self.log_w(text)
162 print text
163 else:
164 text =
"Export_table Error !"
165 self.log_w(text)
166 print "\033[1;31;40m%s\033[0m" %
text
167 sys.exit()
168 except Exception,e:
169 text =
"SSH connect Error haha!"
170 print e
171 self.log_w(text)
172 print "\033[1;31;40m%s\033[0m" %
text
173 # sys.exit()
174
175 def down_back(self):
#拷贝主库当天的数据库备份和表结构
176 os.popen(
"mkdir -p /data1/tmp/%s"%
self.m_app_name)
177 local_dir=
"/data1/tmp/%s/"%
self.m_app_name
178 remote_dir=
‘/data1/script/db_back/‘
179 try:
180 t=
paramiko.Transport((self.host,self.port))
181 t.connect(username=self.user,password=
self.password)
182 sftp=
paramiko.SFTPClient.from_transport(t)
183 files=
sftp.listdir(remote_dir)
184 text =
"Download back file,Please wait ...."
185 self.log_w(text)
186 print "\033[1;32;40m%s\033[0m" %
text
187 text =
" Beginning to download file from %s %s " %
(self.host,datetime.datetime.now())
188 self.log_w(text)
189 print text
190 for f
in files:
191 if f.find(self.today) != -1
and f.find(str(self.dist_id)) != -1
and f.find(
"%s_%s"%(self.main_id,self.app_name.split(
‘_‘)[0])) != -1
:
192 text =
" Downloading file:%s:%s" %
(self.host,os.path.join(remote_dir,f))
193 self.log_w(text)
194 print text
195 sftp.get(os.path.join(remote_dir,f),os.path.join(local_dir,f))
196 #sftp.put(os.path.join(local_dir,f),os.path.join(remote_dir,f))
197 t.close()
198 text =
‘ Download All back file success %s ‘ %
datetime.datetime.now()
199 self.log_w(text)
200 print text
201 except Exception,e:
202 text =
"SFTP connect Error !"
203 self.log_w(text)
204 print "\033[1;31;40m%s\033[0m" %
text
205 print e
206 sys.exit()
207
208 def unbz2(self):
#解压拷贝的数据库备份和表结构bz2包
209 text =
"Decompression file,Please wait ...."
210 self.log_w(text)
211 print "\033[1;32;40m%s\033[0m" %
text
212 text =
‘ Beginning to Decompression file from %s‘ %
datetime.datetime.now()
213 self.log_w(text)
214 print text
215 conm =
‘bzip2 -dfk /data1/tmp/%s/%s_%s*%s_%s*.bz2 && echo $?‘ %(self.m_app_name,self.main_id,self.app_name.split(
‘_‘)[0],self.dist_id,self.today)
216 bz =
os.popen(conm).read().strip()
217 if bz ==
‘0‘:
218 text =
‘ Decompression file success %s‘ %
datetime.datetime.now()
219 self.log_w(text)
220 print text
221 else:
222 text =
"Decompression Error !"
223 self.log_w(text)
224 print "\033[1;31;40m%s\033[0m" %
text
225 sys.exit()
226
227 def restart_mysql(self):
228 text =
"Restart mysql Now,Please wait ...."
229 self.log_w(text)
230 print "\033[1;32;40m%s\033[0m" %
text
231 os.popen(
"sed -i ‘s/#log-bin/log-bin/‘ /home/mysql/etc/%s.cnf" %
(self.conf_name))
232 os.popen(
"sed -i ‘s/#binlog_format/binlog_format/‘ /home/mysql/etc/%s.cnf" %
(self.conf_name))
233 os.popen(
"/usr/local/mysql%s/bin/mysqladmin --socket=/tmp/mysql-%s.sock shutdown"%
(self.version,self.dbport))
234 while 1
:
235 if os.popen(
"netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() ==
‘0‘:
236 time.sleep(10
)
237 os.popen(
"/usr/local/mysql%s/bin/mysqld_safe --defaults-file=/home/mysql/etc/%s.cnf --user=mysql &"%
(self.version,self.conf_name))
238 break
239 else:
240 text =
"Mysql not stop,please wait..."
241 self.log_w(text)
242 print text
243 time.sleep(5
)
244 a =
0
245 while 1
:
246 if os.popen(
"netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() !=
‘0‘:
247 text =
"Mysql restart success !!"
248 self.log_w(text)
249 print "\033[1;32;40m%s\033[0m" %
text
250 break
251 else:
252 if a == 12
:
253 text =
"Mysql restart Error,please check !!"
254 self.log_w(text)
255 print "\033[1;31;40m%s\033[0m" %
text
256 sys.exit()
257 else:
258 a= a + 1
259 time.sleep(5
)
260
261 if self.app_name==
"adb_s" and self.version==
"5.1" and self.m_version==
"4.0":
262 os.popen(
"/usr/local/mysql%s/bin/mysql_upgrade --socket=/tmp/mysql-%s.sock -udev2 -pPs6iW7-Jp39Rx5"%
(self.version,self.dbport))
263
264 def import_date(self):
#导入表结构和备份数据库
265 text =
"Slave import master date,Please wait ...."
266 self.log_w(text)
267 print "\033[1;32;40m%s\033[0m" %
text
268 #导入表结构
269 dir =
"/data1/tmp/%s/"%
self.m_app_name
270 table =
‘%s_%s_table_%s_%s‘ %(self.main_id,self.app_name.split(
‘_‘)[0],self.dist_id,self.today)
271 conm =
"/usr/local/mysql%s/bin/mysql --socket=/tmp/mysql-%s.sock < %s%s && echo $?" %
(self.version,self.dbport,dir,table)
272 result =
os.popen(conm).read().strip()
273 if result ==
‘0‘:
274 text =
" Import %s success !" %
table
275 self.log_w(text)
276 print text
277 else:
278 text =
"Import Table structure Error !"
279 self.log_w(text)
280 print "\033[1;31;40m%s\033[0m" %
text
281 sys.exit()
282
283 for f
in os.listdir(dir):
#导入数据库
284 if os.path.isfile(os.path.join(dir,f))
and (f.find(
‘bz2‘) == -1)
and (f.find(
‘table‘) == -1
):
285 if f.find(
"%s_%s"%(self.main_id,self.app_name.split(
‘_‘)[0])) != -1
and f.find(self.today) != -1
and (f.find(str(self.dist_id)) != -1
):
286 conm =
"/usr/local/mysql%s/bin/mysql --socket=/tmp/mysql-%s.sock < %s && echo $?" %
(self.version,self.dbport,os.path.join(dir,f))
287 result =
os.popen(conm).read().strip()
288 if result ==
‘0‘:
289 text =
" Import %s success !" %
f
290 self.log_w(text)
291 print text
292 else:
293 text =
"Import Database %s Error !" %self.app_name.split(
‘_‘)[0]
294 self.log_w(text)
295 print "\033[1;31;40m%s\033[0m" %
text
296 sys.exit()
297
298 def slave_start(self):
#启动salve
299 text =
"Settings Slave,Please wait ...."
300 self.log_w(text)
301 print "\033[1;32;40m%s\033[0m" %
text
302 binlog,log_pos=
self.bin_pos()
303 sql =
"change master to master_host=‘%s‘,master_user=‘repl‘,master_password=‘H7RYbCkGHmm_P1XO‘,master_port=%s,master_log_file=‘%s‘,master_log_pos=%s;" %
(self.host,self.m_dbport,binlog,log_pos)
304 try:
305 conn = MySQLdb.connect(host =
‘127.0.0.1‘,port=self.dbport,user =
‘‘,passwd =
‘‘,connect_timeout=5
)
306 cursor = conn.cursor(cursorclass =
MySQLdb.cursors.DictCursor)
307 cursor.execute(sql)
308 cursor.execute(
"slave start;")
309 time.sleep(3
)
310 cursor.execute(
"show slave status;")
311 alldata =
cursor.fetchall()[0]
312 if alldata[
"Slave_IO_Running"] ==
"Yes" and alldata[
"Slave_SQL_Running"] ==
"Yes":
313 text =
" Settings Slave success!"
314 self.log_w(text)
315 print "\033[1;32;40m%s\033[0m" %
text
316 for key
in alldata:
317 print "%21s :" % key +
‘\t‘ +
str(alldata[key])
318 time.sleep(5
)
319 print
320 print "******************************************"
321 print
322 cursor.execute(
"show slave status;")
323 alldata =
cursor.fetchall()[0]
324 for key
in alldata:
325 print "%21s :" % key +
‘\t‘ +
str(alldata[key])
326 else:
327 text =
" Settings Slave Error,Please check it!"
328 self.log_w(text)
329 print "\033[1;31;40m%s\033[0m" %
text
330 cursor.close()
331 conn.close()
332 except MySQLdb.Error,e:
333 self.log_w(e)
334 print e
335 sys.exit()
336
337
338 def bin_pos(self):
#获取主库备份前的一个bin-log文件以及它的第一个pos位置
339 dir =
"/data1/tmp/%s/"%
self.m_app_name
340 for f
in os.listdir(dir):
341 if os.path.isfile(os.path.join(dir,f))
and (f.find(
‘bz2‘) == -1)
and (f.find(
‘table‘) == -1
):
342 if f.find(
"%s_%s"%(self.main_id,self.app_name.split(
‘_‘)[0])) != -1
and f.find(self.today) != -1
and (f.find(str(self.dist_id)) != -1
):
343 binlog = f.split(
‘_‘)[4
]
344 log_pos = f.split(
‘_‘)[5
]
345 return binlog,log_pos
346 ‘‘‘
347 def eth1_ip(self):
348 eth0_ip = self.m_ip()
349 try:
350 conm = "ifconfig | grep Mask | grep 192.168 | awk -F ‘:‘ ‘{print $2}‘ | awk -F ‘ ‘ ‘{print $1}‘"
351 result = ssh_connect(eth0_ip,self.password,conm)
352 if result != ‘‘:
353 text = " 成功获取主库内网地址 :%s".decode("utf-8").encode("GBK") % result
354 self.log_w(text)
355 print "\033[1;32;40m%s\033[0m" % text
356 #self.host=result
357
358 else:
359 text = " 无法获取主库内网地址".decode("utf-8").encode("GBK")
360 self.log_w(text)
361 print "\033[1;31;40m%s\033[0m" % text
362 sys.exit()
363 except Exception,e:
364 text = "SSH connect Error !"
365 self.log_w(text)