亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

眾所周知,flink在開啟checkpoint之后,source 任務收到觸發檢查點保存的指令后,會立即在當前處理的數據中插入一個標識字段(Barrier),然后再向下游任務發出。我們平時使用比較多的是對齊的barrier,那你知道非對齊的barrier嗎?如何使用呢?讓我們通過下面的閱讀一起了解一下吧。

一、Barrier

流的barrier是Flink Checkpoint中的一個核心概念。多個barrier被插入到數據流中, 然后作為數據流的一部分隨著數據流動(有點類似于Watermark)。這些barrier不會跨越流中的數據。

每個barrier會把數據流分成兩部分:一部分數據進入當前的快照,另一部分數據進入下一個快照。每個barrier攜帶著快照的id。barrier 不會暫停數據的流動,所以非常輕量級。在流中, 同一時間可以有來源于多個不同快照的多個barrier,這意味著可以并發地出現不同的快照。

二、對齊的barrier

在多并行度下,如果要實現嚴格一次,則要執行barrier對齊。當 job graph 中的每個 operator 接收到 barrier 時,就會記錄下其狀態。擁有兩個輸入流的 operators(例如 CoProcessFunction)會執行 barrier 對齊(barrier alignment),以便當前快照能夠包含兩個輸入流 barrier 之前(但不超過)的所有 events 產生的狀態。

1. 當operator收到數字流的barrier n時, 它就不能處理(但是可以接收)來自該流的任何數據記錄,直到它從字母流的所有輸入中接收到 barrier n 為止。

2. 接收到 barrier n 的流(數字流)暫時被擱置。從這些流接收的記錄會進入輸入緩沖區, 不會被處理。例如圖中的 barrier n 之后的數據 123 已經到達了operator, 存入到了輸入緩沖區沒有被處理, 只有等到字母流的 barrier n 到達之后才會開始處理。

3. 一旦最后所有輸入流都接收到 barrier n,operator 就會把緩沖區中待輸出的數據發出去,然后把 barrier n 接著往下游發送。這里還會對自身進行快照。

優點:

  • barrier 對齊不僅保證了狀態的準確性,還巧妙地消去了原生C-L算法中記錄輸入流狀態的步驟,十分輕量級,保存的數據體積小。

缺點:

  • 延遲性高(快的barrier到達后會阻塞此條流的數據處理)。
  • 加劇作業的反壓(當出現反壓時,數據本身就處理不過來,此時某條流的數據又阻塞了,所以就會加劇反壓)。
  • 整體 chenkpoint 時間變長(因為反壓會導致數據流速變慢,導致barrier流動速度也會變慢,所以整體chenkpoint時間就會變長)。

三、barrier不對齊

如果barrier不對齊會怎么樣?會重復消費,就是至少一次語義。

1. 當 operator 收到數字流的 barrier n 時,開啟本地快照記錄自己的狀態,并將這個 barrier 發往下游(輸出緩沖區)。

2. 接收到 barrier n 的流(數字流)會繼續往下走。字母流的 barrier n 前面的數據(abcd)會被保存到狀態里面,直到 barrier n 到來以后,再進行checkpoint,將數據保存到檢查點中。

優點:

  • 避免了 checkpoint 可能帶來的阻塞,有利于提升 Flink 的資源利用率。

缺點:

  • 由于要持久化緩存數據,State Size 會有比較大的增長,磁盤負載會加重。
  • 隨著 State Size 增長,作業恢復時間可能增長,運維管理難度增加。

圖片來源于網絡,侵刪

四、barrier的使用

對齊

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 開始一次 checkpoint

env.enableCheckpointing(1000);

// 高級選項:

// 設置模式為精確一次 (這是默認值),對于延遲要求較高的選擇,最少一次

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 確認 checkpoints 之間的時間會進行 500 ms

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必須在一分鐘內完成,否則就會被拋棄

env.getCheckpointConfig().setCheckpointTimeout(60000);

// 允許兩個連續的 checkpoint 錯誤

env.getCheckpointConfig().setTolerableCheckpointFAIlureNumber(2);

// 同一時間只允許一個 checkpoint 進行

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,這樣 checkpoint 在作業取消后仍就會被保留

env.getCheckpointConfig().setExternalizedCheckpointCleanup(

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 開啟實驗性的 unaligned checkpoints

env.getCheckpointConfig().enableUnalignedCheckpoints();

非對齊

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 啟用非對齊 Checkpoint

env.getCheckpointConfig().enableUnalignedCheckpoints();

或者在 flink-conf.yml 配置文件中增加配置:

execution.checkpointing.unaligned: true

五、總結

非對齊barrier主要是解決嚴重反壓情況下作業難以完成 checkpoint 的問題,同時它以磁盤資源為代價,避免了 checkpoint 可能帶來的阻塞,有利于提升 Flink 的資源利用率。

分享到:
標簽:Flink
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定