作者/蔡錫生
簡介
gocraft/work是一款使用go開發的任務處理軟件,通過redis存儲任務隊列,可以使用工作池同時處理多個任務。本文主要介紹任務注冊和任務消費的源代碼。
功能特性
• Fast and efficient. Faster than this, this, and this. See below for benchmarks.
• Reliable - don't lose jobs even if your process crashes.
• Middleware on jobs -- good for metrics instrumentation, logging, etc.
• If a job fails, it will be retried a specified number of times.
• Schedule jobs to hAppen in the future.
• Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.
• Web UI to manage failed jobs and observe the system.
• Periodically enqueue jobs on a cron-like schedule.
• Pause / unpause jobs and control concurrency within and across processes.
注冊Job
注冊Job流程
1. 創建redis client pool。
2. 創建對象,定義任務處理函數。
3. 創建任務工作池,需要傳入被處理對象結構體,最大并發數,命名空間,redis client pool。
4. 創建Job,需要傳入job名稱和job處理函數, job在redis中使用列表存儲,key的組成:nameSapce:job:jobName,同一namespace支持多種類型任務處理。
這里使用任務名稱作為key存入redis, 任務處理參數存放到列表中
func main() {
// Make a new pool. Arguments:
// Context{} is a struct that will be the context for the request.
// 10 is the max concurrency
// "my_app_namespace" is the Redis namespace
// redisPool is a Redis pool
pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)
// Add middleware that will be executed for each job
pool.Middleware((*Context).Log)
// Map the name of jobs to handler functions
// pool 中的 jobTypes是一個字典,key 是任務名稱, value 是 任務處理函數
// 當有任務的時候,會將任務需要的參數 放入到redis key 為jobName的列表中
// 第二個參數必須是 工作池對象的方法
pool.Job("send_email", (*Context).SendEmail)
// Customize options:
pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)
// Start processing jobs
pool.Start()
...
}
發送Job
發送job其實調用NewEnqueuer方法向redis的列表中壓入元素(具體的內容是任務參數)。
package main
import (
"github.com/gomodule/redigo/redis"
"github.com/gocraft/work"
)
// Make a redis pool
var redisPool = &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379")
},
}
// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)
func main() {
// Enqueue a job named "send_email" with the specified parameters.
_, err := enqueuer.Enqueue("send_email", work.Q{"address": "[email protected]", "subject": "hello world", "customer_id": 4})
if err != nil {
log.Fatal(err)
}
}
Woker Fetch Job
在New WrokPool的時候會根據并法參數concurrency,創建同等個數的woker。
Worker是一個job處理者,通過永久for循環,不間斷的從redis的任務隊列中獲取任務,在處理任務的時候,協程阻塞,等待一個任務處理完,再繼續下一個。
下面的代碼是worker在for循環中的重要操作(1) fetch job (2) process job
func (w *worker) loop() {
for {
select {
。。。
case <-timer.C:
job, err := w.fetchJob()
w.process(job)
}
}
}
fetchJob本質是redis的pop,push操作。首先將redis列表中的任務移除,然后再放入到處理隊列中,這個操作必須是原子操作(原子性是指事務是一個不可再分割的工作單元,事務中的操作要么都發生,要么都不發生),作者使用了lua腳本完成。最后返回一個job對象,里面有后面任務處理函數需要的args,即這里的rawJson。
func (w *worker) fetchJob() (*Job, error) {
scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]
...
values, err := redis.Values(w.redisFetchScript.Do(conn, scriptArgs...))
...
job, err := newJob(rawJSON, dequeuedFrom, inProgQueue)
..
return job, nil
}
Worker handle Job.
Pool.JobWithOptions(InstallMasterJob, work.JobOptions{Priority: 1, MaxFails: 1}, ConsumeJob)
workpool注冊任務ConsumeJob后, 該任務ConsumeJob會被賦值給worker.jobTypes[job.Name].GenericHandler, 他的反射類型被賦值給了jobType.DynamicHandler。如果該消費任務使用了上下文參數。
創建消費任務的2種方法
• If you don't need context:
func YourFunctionName(job *work.Job) error.
• If you want your handler to accept a context:
func (c *Context) YourFunctionName(job *work.Job) error // or,
func YourFunctionName(c *Context, job *work.Job) error.
func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool {
jobOpts = applyDefaultsAndValidate(jobOpts)
vfn := reflect.ValueOf(fn)
validateHandlerType(wp.contextType, vfn)
jt := &jobType{
Name: name,
//vfn 任務消費方法的反射類型, 如果消費方法中有ctx 參數,那么會調用反射執行
DynamicHandler: vfn,
JobOptions: jobOpts,
}
if gh, ok := fn.(func(*Job) error); ok {
// 用戶的任務消費函數,被賦值給了jobType的GenericHandler, 如果消費方法只有一個job參數,則執行GenericHandler
jt.IsGeneric = true
jt.GenericHandler = gh
}
wp.jobTypes[name] = jt
for _, w := range wp.workers {
w.updateMiddlewareAndJobTypes(wp.middleware, wp.jobTypes)
}
return wp
}
執行消費任務的真正代碼
worker對象的processJob(job * Job)方法 調用了runJob方法執行GenericHandler or DynamicHandler.Call。
func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt *jobType) (returnCtx reflect.Value, returnError error) {
。。。。
next = func() error {
。。。。
if jt.IsGeneric {
// 任務消費方法沒有ctx時候執行
return jt.GenericHandler(job)
}
// 任務消費方法有ctx時執行
res := jt.DynamicHandler.Call([]reflect.Value{returnCtx, reflect.ValueOf(job)})
x := res[0].Interface()
if x == nil {
return nil
}
return x.(error)
}
...
returnError = next()
return
}
LStack產品簡介
面向行業應用開發商(ISV/SI)提供混合云/邊緣云場景下云原生應用開發測試、交付、運維一站式服務,幫助企業采用云原生敏捷開發交付方法論,從而提高軟件開發人員效率、減少運維成本,加快數字化轉型,并最終實現業務創新。






