当前位置:Gxlcms > 数据库问题 > golang+数据库定时任务

golang+数据库定时任务

时间:2021-07-01 10:21:17 帮助过:3人阅读

golang+数据库定时任务


   项目背景大致如下,楼主在用nodejs写项目时遇到一些需要定时去处理的事情,例如僵尸用户定时清除,一些产品定时下架,邮件定时发送等等! 期初使用nodejs setTimeOut递归嵌套实现,后来发现内存不断飙升,故而放弃,最终改用了性能不错的golang实现

数据库设计


技术分享

字段名称 含义
id 编号
name 任务名称
create_at 创建时间
type 1. 执行一次 2.循环执行
separate_time 执行间隔
status 执行状态 0.未开始 1. 执行中 -1.执行失败 -2.手动暂停
remark 备注信息
fn 要执行的数据库存储过程或函数
start_time 开始执行时间
next_exec_time 下次执行时间
last_exec_time 上次执行时间
fn_type email, sql 等等

大致实现流程


  1. 需要有一个死循环,sleep 10s启动然后sleep 10 …
    		for {
			time.Sleep(10 * time.Second)
			go execTask(*db) //使用子进程执行,防止卡死主进程
		}
  1. 开始执行,查找需要执行的任务
rows, err := db.Query("SELECT id,name,status,type,fn,fn_type, separate_time FROM public.tasks where (status = 0 and start_time < now()) or (status = 1 and next_exec_time < now());")
  1. 执行任务
res, err := db.Exec(fn)
  1. 执行任务成功后,更新下次执行时间
func setTaskNextExecTime(db sql.DB, taskId string, separateTime int64) error {
	next_exec_time := time.Now().Unix() + separateTime
	nextTime := time.Unix(next_exec_time, 999)
	res, err := db.Exec("UPDATE tasks set status = 1, last_exec_time=now(), next_exec_time=$2 WHERE id = $1::uuid", taskId, nextTime)
	res = nil
	log.Println(res)
	return err;
}

优缺点


    优点:
        1. 所有任务执行状态都可以查询到,例如任务异常或者上次执行时间,下次执行时间
        2. 增加一个定时任务,只需要在数据库插入一条记录就OK
    缺点:
        1. 如果要绑定非数据库可操作任务,需要自己扩展
    

项目源码


// MTask project main.go
package main

import (
	"database/sql"
	_ "github.com/lib/pq"
	"log"
	"time"
	"os"
	"io/ioutil"
	"encoding/json"
)

//配置结构体
type Conf struct {
	Db map[string] string
}

//读取配置文件
func readConf(path string) (Conf, error) {
	var c Conf
	var err error
	
	fi, err := os.Open(path)
	if err != nil {
		return c, err 
	} else {
		defer fi.Close()
		
		//读取配置文件
		fd, err := ioutil.ReadAll(fi)
		if err != nil {
			return c, err
		} else {
			var c Conf
			err = json.Unmarshal(fd, &c)
			if err != nil {
				return c, err
			} else {
				return c, err
			}
		}
	}
	return c, err
}


func main() {
	c, err := readConf("./conf.json")
	if err != nil {
		log.Print(err)
		panic(err)
	}
	db, err := sql.Open("postgres", c.Db["postgres"])
	if err != nil {
		log.Print(err)
	} else {
		defer db.Close()
		for {
			time.Sleep(10 * time.Second)
			go execTask(*db)
		}
	}
}

func execTask(db sql.DB) {
	defer func() {
		if err := recover(); err != nil {
			log.Print(err)
			log.Printf("执行任务时发生错误:%s", err)
		}
	}();
	
	log.Println("开始执行任务.......")
	rows, err := db.Query("SELECT id,name,status,type,fn,fn_type, separate_time FROM public.tasks where (status = 0 and start_time < now()) or (status = 1 and next_exec_time < now());")
	if err != nil {
		log.Print(err)
	} else {
		defer rows.Close()
		for rows.Next() {
			var id string
			var name string
			var status int
			var taskType int
			var separateTime int64
			var fn string
			var fnType string

			err = rows.Scan(&id, &name, &status, &taskType, &fn, &fnType, &separateTime)

			if err != nil {
				//记录错误,同时更新任务信息为异常
				log.Print(err)
				err = setTaskExecFail(db, id)
				if err != nil {
					log.Print(err)
				}
			} else {
				if (fnType == "sql") {
					res, err := db.Exec(fn)
					if err != nil {
						log.Print(err)
						err = setTaskExecFail(db, id)
						if err != nil {
							log.Print(err)
						}
						log.Printf("任务:%s执行时出错", name)
					} else {
						res = nil
						log.Println(res)
						
						if taskType == 1 {
							err = setTaskExecSuccess(db, id)
							if err != nil {
								log.Print(err)
							}
							log.Printf("任务:%s执行完成", name)
						} else {
							err = setTaskNextExecTime(db, id, separateTime)
							if err != nil {
								log.Print(err)
							}
						}
						log.Printf("任务:%s执行成功", name)
					}
				} else if (fnType == "bash") {
					log.Printf("这是一个bash任务")
				} else if (fnType == "python") {
					log.Printf("这是一个python任务")
				} else if (fnType == "email") {
					//发送email任务
					err = ExecEmailTask(db)
					if err != nil {
						handleFail(db, id)
						log.Println(err)
					} else {
						handleSuccess(db, id)
					}
					log.Printf("发送邮件任务")
					setTaskExecSuccess(db, id)
					setTaskNextExecTime(db, id, separateTime)
				} else if (fnType == "sms") {
					//发送短信任务
					log.Printf("发送短信任务")
				}
				
			}
		}

		err = rows.Err()
		if err != nil {
			log.Print(err)
		}
	}
	log.Println("结束执行任务....")
}

func setTaskExecFail(db sql.DB, taskId string) error {
	res, err := db.Exec("UPDATE tasks set status = -2 WHERE id = $1::uuid", taskId)
	err = nil
	log.Println(res)
	return err
}

func setTaskExecSuccess(db sql.DB, taskId string) error {
	res, err := db.Exec("UPDATE tasks set status = 2 WHERE id = $1::uuid", taskId)
	err = nil
	log.Println(res)
	return err
}

func setTaskNextExecTime(db sql.DB, taskId string, separateTime int64) error {
	next_exec_time := time.Now().Unix() + separateTime
	nextTime := time.Unix(next_exec_time, 999)
	res, err := db.Exec("UPDATE tasks set status = 1, last_exec_time=now(), next_exec_time=$2 WHERE id = $1::uuid", taskId, nextTime)
	res = nil
	log.Println(res)
	return err;
}

golang+数据库定时任务

标签:

人气教程排行