亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

流處理(Stream processing)是一種計算機編程范式,其允許給定一個數據序列(流處理數據源),一系列數據操作(函數)被應用到流中的每個元素。同時流處理工具可以顯著提高程序員的開發效率,允許他們編寫有效、干凈和簡潔的代碼。

流數據處理在我們的日常工作中非常常見,舉個例子,我們在業務開發中往往會記錄許多業務日志,這些日志一般是先發送到Kafka,然后再由Job消費Kafaka寫到elasticsearch,在進行日志流處理的過程中,往往還會對日志做一些處理,比如過濾無效的日志,做一些計算以及重新組合日志等等,示意圖如下:

Golang之流式編程

 

流處理工具fx

gozero是一個功能完備的微服務框架,框架中內置了很多非常實用的工具,其中就包含流數據處理工具fx,下面我們通過一個簡單的例子來認識下該工具:

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/tal-tech/go-zero/core/fx"
)

func main() {
	ch := make(chan int)

	go inputStream(ch)
	go outputStream(ch)

	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
	<-c
}

func inputStream(ch chan int) {
	count := 0
	for {
		ch <- count
		time.Sleep(time.Millisecond * 500)
		count++
	}
}

func outputStream(ch chan int) {
	fx.From(func(source chan<- interface{}) {
		for c := range ch {
			source <- c
		}
	}).Walk(func(item interface{}, pipe chan<- interface{}) {
		count := item.(int)
		pipe <- count
	}).Filter(func(item interface{}) bool {
		itemInt := item.(int)
		if itemInt%2 == 0 {
			return true
		}
		return false
	}).ForEach(func(item interface{}) {
		fmt.Println(item)
	})
}

inputStream函數模擬了流數據的產生,outputStream函數模擬了流數據的處理過程,其中From函數為流的輸入,Walk函數并發的作用在每一個item上,Filter函數對item進行過濾為true保留為false不保留,ForEach函數遍歷輸出每一個item元素。

流數據處理中間操作

一個流的數據處理可能存在許多的中間操作,每個中間操作都可以作用在流上。就像流水線上的工人一樣,每個工人操作完零件后都會返回處理完成的新零件,同理流處理中間操作完成后也會返回一個新的流。

Golang之流式編程

 

fx的流處理中間操作:

操作函數功能輸入Distinct去除重復的itemKeyFunc,返回需要去重的keyFilter過濾不滿足條件的itemFilterFunc,Option控制并發量Group對item進行分組KeyFunc,以key進行分組Head取出前n個item,返回新streamint64保留數量Map對象轉換MapFunc,Option控制并發量Merge合并item到slice并生成新streamReverse反轉itemSort對item進行排序LessFunc實現排序算法Tail與Head功能類似,取出后n個item組成新streamint64保留數量Walk作用在每個item上WalkFunc,Option控制并發量

下圖展示了每個步驟和每個步驟的結果:

Golang之流式編程

 

用法與原理分析

From

通過From函數構建流并返回Stream,流數據通過channel進行存儲:

// 例子
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
})

// 源碼
func From(generate GenerateFunc) Stream {
	source := make(chan interface{})

	go func() {
		defer close(source)
    // 構造流數據寫入channel
		generate(source)
	}()

	return Range(source)
}

Filter

Filter函數提供過濾item的功能,FilterFunc定義過濾邏輯true保留item,false則不保留:

// 例子 保留偶數
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
}).Filter(func(item interface{}) bool {
  if item.(int)%2 == 0 {
    return true
  }
  return false
})

// 源碼
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
	return p.Walk(func(item interface{}, pipe chan<- interface{}) {
    // 執行過濾函數true保留,false丟棄
		if fn(item) {
			pipe <- item
		}
	}, opts...)
}

Group

Group對流數據進行分組,需定義分組的key,數據分組后以slice存入channel:

// 例子 按照首字符"g"或者"p"分組,沒有則分到另一組
ss := []string{"golang", "google", "php", "Python", "JAVA", "c++"}
fx.From(func(source chan<- interface{}) {
		for _, s := range ss {
			source <- s
		}
	}).Group(func(item interface{}) interface{} {
		if strings.HasPrefix(item.(string), "g") {
			return "g"
		} else if strings.HasPrefix(item.(string), "p") {
			return "p"
		}
		return ""
	}).ForEach(func(item interface{}) {
		fmt.Println(item)
	})
}

// 源碼
func (p Stream) Group(fn KeyFunc) Stream {
  // 定義分組存儲map
	groups := make(map[interface{}][]interface{})
	for item := range p.source {
    // 用戶自定義分組key
		key := fn(item)
    // key相同分到一組
		groups[key] = Append(groups[key], item)
	}

	source := make(chan interface{})
	go func() {
		for _, group := range groups {
      // 相同key的一組數據寫入到channel
			source <- group
		}
		close(source)
	}()

	return Range(source)
}

Reverse

reverse可以對流中元素進行反轉處理:

Golang之流式編程

 

// 例子
fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
  fmt.Println(item)
})

// 源碼
func (p Stream) Reverse() Stream {
	var items []interface{}
  // 獲取流中數據
	for item := range p.source {
		items = append(items, item)
	}
	// 反轉算法
	for i := len(items)/2 - 1; i >= 0; i-- {
		opp := len(items) - 1 - i
		items[i], items[opp] = items[opp], items[i]
	}
	
  // 寫入流
	return Just(items...)
}

Distinct

distinct對流中元素進行去重,去重在業務開發中比較常用,經常需要對用戶id等做去重操作:

// 例子
fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
  return item
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 結果為 1,2,3,4,5,6

// 源碼
func (p Stream) Distinct(fn KeyFunc) Stream {
	source := make(chan interface{})

	threading.GoSafe(func() {
		defer close(source)
		// 通過key進行去重,相同key只保留一個
		keys := make(map[interface{}]lang.PlaceholderType)
		for item := range p.source {
			key := fn(item)
      // key存在則不保留
			if _, ok := keys[key]; !ok {
				source <- item
				keys[key] = lang.Placeholder
			}
		}
	})

	return Range(source)
}

Walk

Walk函數并發的作用在流中每一個item上,可以通過WithWorkers設置并發數,默認并發數為16,最小并發數為1,如設置unlimitedWorkers為true則并發數無限制,但并發寫入流中的數據由defaultWorkers限制,WalkFunc中用戶可以自定義后續寫入流中的元素,可以不寫入也可以寫入多個元素:

// 例子
fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
  newItem := strings.ToUpper(item.(string))
  pipe <- newItem
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})

// 源碼
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
	pipe := make(chan interface{}, option.workers)

	go func() {
		var wg sync.WaitGroup
		pool := make(chan lang.PlaceholderType, option.workers)

		for {
      // 控制并發數量
			pool <- lang.Placeholder
			item, ok := <-p.source
			if !ok {
				<-pool
				break
			}

			wg.Add(1)
			go func() {
				defer func() {
					wg.Done()
					<-pool
				}()
				// 作用在每個元素上
				fn(item, pipe)
			}()
		}

    // 等待處理完成
		wg.Wait()
		close(pipe)
	}()

	return Range(pipe)
}

并發處理

fx工具除了進行流數據處理以外還提供了函數并發功能,在微服務中實現某個功能往往需要依賴多個服務,并發的處理依賴可以有效的降低依賴耗時,提升服務的性能。

Golang之流式編程

 

fx.Parallel(func() {
  userRPC() // 依賴1
}, func() {
  accountRPC() // 依賴2
}, func() {
  orderRPC() // 依賴3
})

注意fx.Parallel進行依賴并行處理的時候不會有error返回,如需有error返回或者有一個依賴報錯需要立馬結束依賴請求請使用MapReduce工具進行處理。

總結

本篇文章介紹了流處理的基本概念和gozero中的流處理工具fx,在實際的生產中流處理場景應用也非常多,希望本篇文章能給大家帶來一定的啟發,更好的應對工作中的流處理場景。

分享到:
標簽:Golang
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定