眾所周知,flink在開啟checkpoint之后,source 任務收到觸發檢查點保存的指令后,會立即在當前處理的數據中插入一個標識字段(Barrier),然后再向下游任務發出。我們平時使用比較多的是對齊的barrier,那你知道非對齊的barrier嗎?如何使用呢?讓我們通過下面的閱讀一起了解一下吧。
一、Barrier
流的barrier是Flink Checkpoint中的一個核心概念。多個barrier被插入到數據流中, 然后作為數據流的一部分隨著數據流動(有點類似于Watermark)。這些barrier不會跨越流中的數據。
每個barrier會把數據流分成兩部分:一部分數據進入當前的快照,另一部分數據進入下一個快照。每個barrier攜帶著快照的id。barrier 不會暫停數據的流動,所以非常輕量級。在流中, 同一時間可以有來源于多個不同快照的多個barrier,這意味著可以并發地出現不同的快照。

二、對齊的barrier
在多并行度下,如果要實現嚴格一次,則要執行barrier對齊。當 job graph 中的每個 operator 接收到 barrier 時,就會記錄下其狀態。擁有兩個輸入流的 operators(例如 CoProcessFunction)會執行 barrier 對齊(barrier alignment),以便當前快照能夠包含兩個輸入流 barrier 之前(但不超過)的所有 events 產生的狀態。

1. 當operator收到數字流的barrier n時, 它就不能處理(但是可以接收)來自該流的任何數據記錄,直到它從字母流的所有輸入中接收到 barrier n 為止。
2. 接收到 barrier n 的流(數字流)暫時被擱置。從這些流接收的記錄會進入輸入緩沖區, 不會被處理。例如圖中的 barrier n 之后的數據 123 已經到達了operator, 存入到了輸入緩沖區沒有被處理, 只有等到字母流的 barrier n 到達之后才會開始處理。
3. 一旦最后所有輸入流都接收到 barrier n,operator 就會把緩沖區中待輸出的數據發出去,然后把 barrier n 接著往下游發送。這里還會對自身進行快照。
優點:
- barrier 對齊不僅保證了狀態的準確性,還巧妙地消去了原生C-L算法中記錄輸入流狀態的步驟,十分輕量級,保存的數據體積小。
缺點:
- 延遲性高(快的barrier到達后會阻塞此條流的數據處理)。
- 加劇作業的反壓(當出現反壓時,數據本身就處理不過來,此時某條流的數據又阻塞了,所以就會加劇反壓)。
- 整體 chenkpoint 時間變長(因為反壓會導致數據流速變慢,導致barrier流動速度也會變慢,所以整體chenkpoint時間就會變長)。
三、barrier不對齊
如果barrier不對齊會怎么樣?會重復消費,就是至少一次語義。

1. 當 operator 收到數字流的 barrier n 時,開啟本地快照記錄自己的狀態,并將這個 barrier 發往下游(輸出緩沖區)。
2. 接收到 barrier n 的流(數字流)會繼續往下走。字母流的 barrier n 前面的數據(abcd)會被保存到狀態里面,直到 barrier n 到來以后,再進行checkpoint,將數據保存到檢查點中。
優點:
- 避免了 checkpoint 可能帶來的阻塞,有利于提升 Flink 的資源利用率。
缺點:
- 由于要持久化緩存數據,State Size 會有比較大的增長,磁盤負載會加重。
- 隨著 State Size 增長,作業恢復時間可能增長,運維管理難度增加。
圖片來源于網絡,侵刪
四、barrier的使用
對齊
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(1000);
// 高級選項:
// 設置模式為精確一次 (這是默認值),對于延遲要求較高的選擇,最少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確認 checkpoints 之間的時間會進行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必須在一分鐘內完成,否則就會被拋棄
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允許兩個連續的 checkpoint 錯誤
env.getCheckpointConfig().setTolerableCheckpointFAIlureNumber(2);
// 同一時間只允許一個 checkpoint 進行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,這樣 checkpoint 在作業取消后仍就會被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 開啟實驗性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
非對齊
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 啟用非對齊 Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
或者在 flink-conf.yml 配置文件中增加配置:
execution.checkpointing.unaligned: true
五、總結
非對齊barrier主要是解決嚴重反壓情況下作業難以完成 checkpoint 的問題,同時它以磁盤資源為代價,避免了 checkpoint 可能帶來的阻塞,有利于提升 Flink 的資源利用率。






