[同步脚本]mysql-elasticsearch同步
时间:2021-07-01 10:21:17
帮助过:30人阅读
!/usr/bin/env python
# coding=utf-8
import sys
sys.path.append(‘/Users/cangyufu/work_jbkj/elabels-flask‘)
from modules.utils.commons
import app, redispool, db_master, db_slave
from sqlalchemy
import text
import os
import datetime
import time
from service.myelasticsearch.index
import es
from modules.utils.mysqldb
import db_obj_dict
import datetime
CONST_SLEEP = 3
WORK_INDEX =
‘test‘
#https://stackoverflow.com/questions/136168/get-last-n-lines-of-a-file-with-python-similar-to-tail
def tail(f, lines=1
):
total_lines_wanted =
lines
BLOCK_SIZE = 1024
f.seek(0, 2
)
block_end_byte =
f.tell()
lines_to_go =
total_lines_wanted
block_number = -1
blocks = []
# blocks of size BLOCK_SIZE, in reverse order starting
# from the end of the file
while lines_to_go > 0
and block_end_byte >
0:
if (block_end_byte - BLOCK_SIZE >
0):
# read the last block we haven‘t yet read
f.seek(block_number*BLOCK_SIZE, 2
)
blocks.append(f.read(BLOCK_SIZE))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count(
‘\n‘)
lines_to_go -=
lines_found
block_end_byte -=
BLOCK_SIZE
block_number -= 1
all_read_text =
‘‘.join(reversed(blocks))
return ‘\n‘.join(all_read_text.splitlines()[-
total_lines_wanted:])
def is_file_exists(filename):
if not os.path.isfile(filename):
file = open(filename,
‘wb‘)
file.write("1970-01-01 00:00:00\n")
file.close()
#传入要监控的表名
def sync_main(*
args):
for table
in args:
try:
callable(globals()[‘monitor_‘+
table])
except Exception:
raise Exception(
‘lack function monitor_{}‘.format(table))
for table
in args:
filename =
‘‘.join([
‘monitor_‘, table,
‘.txt‘])
locals()[table+
‘path‘] = os.path.join(os.path.dirname(
__file__), filename)
is_file_exists(locals()[table+
‘path‘])
locals()[table+
‘file‘] = open(locals()[table+
‘path‘],
‘rb+‘)
try:
print "begin"
while True:
count =
0
for table
in args:
print ‘handleing ‘+
table
last_time = tail(locals()[table+
‘file‘], 1
)
update_time = globals()[
‘monitor_‘+
table](last_time)
print update_time
if update_time ==
last_time:
count += 1
continue
locals()[table +
‘file‘].write(update_time+
‘\n‘)
locals()[table +
‘file‘].flush()
if count ==
len(args):
time.sleep(CONST_SLEEP)
except Exception, e:
print e
raise e
finally:
for table
in args:
locals()[table +
‘file‘].close()
########################################################################################################################
#
# 如果要监控哪个表,必须要实现 函数 monitor_table_name,比如要监控table1表,就必须要实现monitor_table1函数,
# 传入参数为开始更新的起始时间,初始化时候为1970-01-01 00:00:00,返回更新到的最新的时间
#
########################################################################################################################
def monitor_table1(last_time):
pass
return last_time
def monitor_table2(last_time):
pass
return last_time
def trans_date_time(dt):
return datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")
sync_main(‘table1‘,‘table2‘)
[同步脚本]mysql-elasticsearch同步
标签:同步 lines 开始 pat ast 网上 too and was