亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

「技術干貨」gocraft/work工作隊列源碼簡介

 

作者/蔡錫生

 

簡介

 

gocraft/work是一款使用go開發的任務處理軟件,通過redis存儲任務隊列,可以使用工作池同時處理多個任務。本文主要介紹任務注冊和任務消費的源代碼。

 

功能特性

 

• Fast and efficient. Faster than this, this, and this. See below for benchmarks.

 

• Reliable - don't lose jobs even if your process crashes.

 

• Middleware on jobs -- good for metrics instrumentation, logging, etc.

 

• If a job fails, it will be retried a specified number of times.

 

• Schedule jobs to hAppen in the future.

 

• Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.

 

• Web UI to manage failed jobs and observe the system.

 

• Periodically enqueue jobs on a cron-like schedule.

 

• Pause / unpause jobs and control concurrency within and across processes.

 

注冊Job

 

注冊Job流程

 

1. 創建redis client pool。

 

2. 創建對象,定義任務處理函數。

 

3. 創建任務工作池,需要傳入被處理對象結構體,最大并發數,命名空間,redis client pool。

 

4. 創建Job,需要傳入job名稱和job處理函數, job在redis中使用列表存儲,key的組成:nameSapce:job:jobName,同一namespace支持多種類型任務處理。

 

這里使用任務名稱作為key存入redis, 任務處理參數存放到列表中

 

func main() {
  // Make a new pool. Arguments:
  // Context{} is a struct that will be the context for the request.
  // 10 is the max concurrency
  // "my_app_namespace" is the Redis namespace
  // redisPool is a Redis pool
  pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)

  // Add middleware that will be executed for each job
  pool.Middleware((*Context).Log)

  // Map the name of jobs to handler functions
  // pool 中的 jobTypes是一個字典,key 是任務名稱, value 是 任務處理函數
  // 當有任務的時候,會將任務需要的參數 放入到redis key 為jobName的列表中
  // 第二個參數必須是 工作池對象的方法
  pool.Job("send_email", (*Context).SendEmail)

  // Customize options:
  pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)

  // Start processing jobs
  pool.Start()
  ...
}

 

發送Job

 

發送job其實調用NewEnqueuer方法向redis的列表中壓入元素(具體的內容是任務參數)。

 

package main

import (
  "github.com/gomodule/redigo/redis"
  "github.com/gocraft/work"
)

// Make a redis pool
var redisPool = &redis.Pool{
  MaxActive: 5,
  MaxIdle: 5,
  Wait: true,
  Dial: func() (redis.Conn, error) {
    return redis.Dial("tcp", ":6379")
  },
}

// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
  // Enqueue a job named "send_email" with the specified parameters.
  _, err := enqueuer.Enqueue("send_email", work.Q{"address": "[email protected]", "subject": "hello world", "customer_id": 4})
  if err != nil {
    log.Fatal(err)
  }
}

 

Woker Fetch Job

 

在New WrokPool的時候會根據并法參數concurrency,創建同等個數的woker。

 

Worker是一個job處理者,通過永久for循環,不間斷的從redis的任務隊列中獲取任務,在處理任務的時候,協程阻塞,等待一個任務處理完,再繼續下一個。

 

下面的代碼是worker在for循環中的重要操作(1) fetch job (2) process job

 

func (w *worker) loop() {
  for {
    select {
     。。。
    case <-timer.C:
      job, err := w.fetchJob()
      w.process(job)
    }
  }
}

 

fetchJob本質是redis的pop,push操作。首先將redis列表中的任務移除,然后再放入到處理隊列中,這個操作必須是原子操作(原子性是指事務是一個不可再分割的工作單元,事務中的操作要么都發生,要么都不發生),作者使用了lua腳本完成。最后返回一個job對象,里面有后面任務處理函數需要的args,即這里的rawJson。

 

func (w *worker) fetchJob() (*Job, error) {

   scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]
  ...
   values, err := redis.Values(w.redisFetchScript.Do(conn, scriptArgs...))
    ...
   job, err := newJob(rawJSON, dequeuedFrom, inProgQueue)
    ..
   return job, nil
}

 

Worker handle Job.

 

Pool.JobWithOptions(InstallMasterJob, work.JobOptions{Priority: 1, MaxFails: 1}, ConsumeJob)

 

workpool注冊任務ConsumeJob后, 該任務ConsumeJob會被賦值給worker.jobTypes[job.Name].GenericHandler, 他的反射類型被賦值給了jobType.DynamicHandler。如果該消費任務使用了上下文參數。

 

創建消費任務的2種方法

 

• If you don't need context:

 

func YourFunctionName(job *work.Job) error.

 

• If you want your handler to accept a context:

 

func (c *Context) YourFunctionName(job *work.Job) error // or,

 

func YourFunctionName(c *Context, job *work.Job) error.

 

func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool {
  jobOpts = applyDefaultsAndValidate(jobOpts)

  vfn := reflect.ValueOf(fn)
  validateHandlerType(wp.contextType, vfn)
  jt := &jobType{
    Name:           name,
    //vfn 任務消費方法的反射類型, 如果消費方法中有ctx 參數,那么會調用反射執行
    DynamicHandler: vfn, 
    JobOptions:     jobOpts,
  }
  if gh, ok := fn.(func(*Job) error); ok {
    // 用戶的任務消費函數,被賦值給了jobType的GenericHandler, 如果消費方法只有一個job參數,則執行GenericHandler
    jt.IsGeneric = true
    jt.GenericHandler = gh
  }

  wp.jobTypes[name] = jt

  for _, w := range wp.workers {
    w.updateMiddlewareAndJobTypes(wp.middleware, wp.jobTypes)
  }

  return wp
}

 

執行消費任務的真正代碼

 

worker對象的processJob(job * Job)方法 調用了runJob方法執行GenericHandler or DynamicHandler.Call。

 

func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt *jobType) (returnCtx reflect.Value, returnError error) {
  。。。。
  next = func() error {
    。。。。
    if jt.IsGeneric {
      // 任務消費方法沒有ctx時候執行
      return jt.GenericHandler(job)
    }
    
    // 任務消費方法有ctx時執行
    res := jt.DynamicHandler.Call([]reflect.Value{returnCtx, reflect.ValueOf(job)})
    x := res[0].Interface()
    if x == nil {
      return nil
    }
    return x.(error)
  }
  ...
  returnError = next()

  return
}

 

LStack產品簡介

 

面向行業應用開發商(ISV/SI)提供混合云/邊緣云場景下云原生應用開發測試、交付、運維一站式服務,幫助企業采用云原生敏捷開發交付方法論,從而提高軟件開發人員效率、減少運維成本,加快數字化轉型,并最終實現業務創新。

分享到:
標簽:gocraft work
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定