亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.430618.com 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

背景

一談到golang,大家的第一感覺就是高并發(fā),高性能。但是語言本身的優(yōu)勢是不是,就讓程序員覺得編寫高性能的處理系統(tǒng)變得輕而易舉,水到渠成呢。下面這篇文章給大家的提醒便是,我們只有在充分理解語言本身的特性,并巧妙加以利用的前提下,才能寫出高性能、高并發(fā)的處理程序,才能為企業(yè)節(jié)省成本,為客戶提供好的服務(wù)。

每分鐘處理百萬請求

?Malwarebytes的首席架構(gòu)師Marcio Castilho分享了他在公司高速發(fā)展過程中,開發(fā)高性能數(shù)據(jù)處理系統(tǒng)的經(jīng)歷。整個過程向我們詳細(xì)展示了如何不斷的優(yōu)化與提升系統(tǒng)性能的過程,值得我們思考與學(xué)習(xí)。大佬也不是一下子就給出最優(yōu)方案的。

首先作者的目標(biāo)是能夠處理來自數(shù)百萬個端點的大量POST請求,然后將接收到的JSON 請求體,寫入Amazon S3,以便map-reduce稍后對這些數(shù)據(jù)進(jìn)行操作。這個場景和我們現(xiàn)在的很多互聯(lián)網(wǎng)系統(tǒng)的場景是一樣的。傳統(tǒng)的處理方式是,使用隊列等中間件,做緩沖,消峰,然后后端一堆worker來異步處理。因為作者也做了兩年GO開發(fā)了,經(jīng)過討論他們決定使用GO來完成這項工作。

第一版代碼

下面是Marcio給出的本能第一反應(yīng)的解決方案,和大家的思路是不是一致的。首先他給出了負(fù)載(Payload)還有負(fù)載集合(PayloadCollection)的定義,然后他寫了一個處理web請求的Handler(payloadHandler)。在payloadHandler里面,由于把負(fù)載上傳S3比較耗時,所以針對每個負(fù)載,啟動GO的協(xié)程來異步上傳。具體的實現(xiàn),大家可以看下面48-50行貼出的代碼。

type PayloadCollection struct {
    windowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
    var contentType = "Application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

那結(jié)果怎么樣呢?Marcio和他的同事們低估了請求的量級,而且上面的實現(xiàn)方法,又無法控制GO協(xié)程的生成數(shù)量,這個版本部署到生產(chǎn)后,很快就崩潰了。Marcio畢竟是牛逼架構(gòu)師,他很快根據(jù)問題給出了新的解決方案。

第二版代碼

第一個版本的假設(shè)是,請求的生命周期都是很短的,不會有長時間的阻塞操作耗費資源。在這個前提下,我們可以根據(jù)請求不停的生成GO協(xié)程來處理請求。但是事實并非如此,Marcio轉(zhuǎn)變思路,引入隊列的思想。創(chuàng)建了Buffered Channel,把請求緩沖起來,然后再通過一個同步處理器從Channel里面把請求取出,上傳S3.這是典型的生產(chǎn)者-消費者模型。

處理流程

這個版本的問題是,首先同步處理器的處理能力有限,他的處理能力比不上請求到達(dá)的速度。很快Buffered Channel就會滿了,然后后續(xù)的客戶請求都會被阻塞。在Marcio他們部署這個有缺陷的版本幾分鐘后,延遲率會以固定的速率增加。

系統(tǒng)部署后的延遲

第三版代碼

Marcio引入了2層Channel,一個Channel用于緩存請求,是一個全局Channel,本文中就是下面的JobQueue,一個Channel用于控制每個請求隊列并發(fā)多少個worker.從下面的代碼可以看到,每個Worker都有兩個關(guān)鍵屬性,一個是WorkerPool(這個也是一個全局的變量,即所有的worker的這個屬性都指向同一個,worker在創(chuàng)建后,會把自身的JobChannel寫入WorkerPool完成注冊),一個是JobChannel(用于緩存分配需要本worker處理的請求作業(yè))。web處理請求payloadHandler,會把接收到的請求放到JobQueue后,就結(jié)束并返回。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

請求任務(wù)都放到JobQueue里面了,如何監(jiān)聽隊列,并觸發(fā)請求呢。這個地方又出現(xiàn)了Dispatcher,我們在另一篇文章中有詳細(xì)探討(基于dispatcher模式的事件與數(shù)據(jù)分發(fā)處理器的Go語言實現(xiàn):
https://www.toutiao.com/article/7186518439215841827/)。在系統(tǒng)啟動的時候,我們會通過NewDispatcher生成Dispatcher,并調(diào)用它的Run方法。

type Dispatcher struct {
    // A pool of workers channels that are registered with the dispatcher
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.pool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // a job request has been received
            go func(job Job) {
                // try to obtain a worker job channel that is available.
                // this will block until a worker is idle
                jobChannel := <-d.WorkerPool

                // dispatch the job to the worker job channel
                jobChannel <- job
            }(job)
        }
    }
}

Dispatcher與Worker的關(guān)系如下圖所示:

第三方案整體流程

1.客戶請求到Handler。

2.Handler把請求作業(yè)寫入JobQueue。

3.Dispatcher的dispatcher方法,從全局JobQueue中讀取Job。

4.Dispatcher的dispatcher方法同時也從WorkerPool中讀取JobChannel(屬于某一個Worker,即每一個Worker都有一個JobChannel)。

5.Dispatcher把獲得的Job寫入JobChannel,即分配某個Worker。

6.Worker從自己的JobChannel中獲取作業(yè)并執(zhí)行。執(zhí)行完成后,空閑后,把自己的JobChannel再次寫入WorkerPool等待分配。

這樣實現(xiàn)后,效果明顯,同時需要的機器數(shù)量大幅降低了,從100臺降低到20臺。

第三方案效果

部署機器變化

這里的兩層,一層是全局JobQueue,緩存任務(wù)。第二個是每個Worker都有自己的執(zhí)行隊列,一臺機器可以創(chuàng)建多個Worker。這樣就提升了處理能力。

方案對比

方案思想

實現(xiàn)難度

方案問題

GO協(xié)程原生方法

簡單

無法應(yīng)對大規(guī)模請求,無法控制協(xié)程數(shù)量

GO 單層Channel

簡單

當(dāng)處理能力達(dá)不到請求速率后,隊列滿,系統(tǒng)崩潰

GO兩層Channel

復(fù)雜

 

參考資料:

http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/。

https://github.com/ReGYChang/zero/blob/main/pkg/utils/worker_pool.go。

分享到:
標(biāo)簽:Golang
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達(dá)人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定