当前位置:Gxlcms > 数据库问题 > python连接mysql

python连接mysql

时间:2021-07-01 10:21:17 帮助过:19人阅读

json import requests import mysql.connector import datetime from configparser import ConfigParser import traceback def sendmessage(url, msg, at_list=[]): """ 给钉钉发消息 """ HEADERS = { "Content-Type": "application/json ;charset=utf-8 " } # String_textMsg = { # "msgtype": "markdown", # "markdown": mdmsg, # "at": { # "atMobiles": at_list, # "isAtAll": 0 # 如果需要@所有人,这些写1 # } # } String_textMsg = { "msgtype": "text", "text": {"content": msg}, "at": { "atMobiles": at_list, # ["10086"] "isAtAll": 0 # 如果需要@所有人,这些写1 } } String_textMsg = json.dumps(String_textMsg) res = requests.post(url, data=String_textMsg, headers=HEADERS) print(str(datetime.datetime.now()) + " 发送钉钉消息:" + str(res.text)) def query_sql(dingding_url, mysql_conn, table, duration_threshold=70000, at_list=[]): sqlPattern = r"select duration,query_sql,type,timestamp from {} where timestamp > ‘{}‘ order by timestamp" now_time = datetime.datetime.now() pre_time = now_time - datetime.timedelta(hours=1) pre_time_str = pre_time.strftime("%Y-%m-%d %H:%M:%S") alarm_list = [] # 获取上次查询时间 with open("/tmp/pandora-sql-monitoring.time", "r") as file: pre_time_local = file.readline() if pre_time_local: pre_time_str = pre_time_local # 构建sql sql = sqlPattern.format(table, pre_time_str) print(sql) last_query_time = None # 查询sql try: cursor = mysql_conn.cursor(dictionary=True) cursor.execute(sql) result = cursor.fetchall() mysql_conn.commit() cursor.close() for row in result: last_query_time = str(row["timestamp"]) if int(row["duration"]) >= int(duration_threshold): alarm_list.append(row) except Exception as e: traceback.print_exc() # 更新最近一次查询的时间 if last_query_time: with open("/tmp/pandora-sql-monitoring.time", "w+") as file: file.write(last_query_time) # 发送钉钉消息 index = 0 for item in alarm_list: msg = "" msg += "耗时:"+str(item["duration"]/1000)+"秒\n" msg += "时间:"+str(item["timestamp"])+"\n" msg += "sql:"+str(item["query_sql"])+"\n" sendmessage(dingding_url[index % len(dingding_url)], msg, at_list) index = index+1 if __name__ == __main__: # 读取配置文件获取druid的请求url cp = ConfigParser() cp.read("properties.cfg") # 读取mysql配置 mysql_host = cp.get("mysql", "host") mysql_port = cp.get("mysql", "port") mysql_user = cp.get("mysql", "user") mysql_password = cp.get("mysql", "password") mysql_db = cp.get("mysql", "database") mysql_table = cp.get("mysql", "table") mysql_conn = mysql.connector.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_password, database=mysql_db) # 多个机器人 为了解决钉钉的消息发送数量限制,轮流发 dingding_url = [] dingding_url.append(cp.get("dingding", "url1")) at_list = ["10086"] # 读取alarm相关信息 duration_threshold = cp.get("alarm", "duration_threshold") query_sql(dingding_url, mysql_conn, mysql_table, duration_threshold, at_list)

 

同目录下的 properties.cfg

[dingding]
url1 = https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxxxxxxxxxxxxxx

[mysql]
host = 127.0.0.1
port = 3306
user = root
password = 123456
database = test
table = test_table

[alarm]
duration_threshold = 3000

 

python连接mysql

标签:构建   相关信息   tab   password   file   roo   set   one   sts   

人气教程排行