近年來流計算技術(shù)發(fā)展迅猛,甚至有后來居上一統(tǒng)原本批處理主導的分布式計算之勢,其中 Watermark 機制作為流計算結(jié)果準確性和延遲的橋梁扮演著不可或缺的角色。然而由于缺乏高質(zhì)量的學習資源加上計算 Watermark 確實不是一件容易的事情,不少有著批處理計算背景的用戶在流計算作業(yè)的開發(fā)中可能并不理解 Watermark 的重要意義,從而多走了很多彎路。為此,本文將基于筆者的學習積累和開發(fā)經(jīng)驗,談?wù)剛€人對 Watermark 的理解,希望起到拋磚引玉的作用。
本文將首先說明 Watermark 提出的背景,然后詳細解析 Watermark 的原理,最后結(jié)合工業(yè)案例說明 Watermark 在實踐中如何被應(yīng)用。
Watermark 背景
自 google 的三篇論文和 Hadoop 出現(xiàn)后,工業(yè)界的分布式計算技術(shù)進入了百花齊放的時期,然而相比于離線批處理計算的蓬勃發(fā)展,作為后來者的流計算卻有點停滯不前。流計算和批處理在對于每條記錄的單獨處理上基本一致,不同之處在于聚合類的計算。批處理計算結(jié)果的輸出依賴于輸入數(shù)據(jù)集合的結(jié)束,而流計算的輸入數(shù)據(jù)集通常是無邊界的,不可能等待輸入結(jié)束再輸出結(jié)果。針對這個問題流處理引入了窗口的特性,簡單來說就是將無限的數(shù)據(jù)流按照時間范圍切分為一個個有限的數(shù)據(jù)集,所以我們依然能夠沿用批處理的計算模型。來到這時,業(yè)界在流計算和批處理的關(guān)系上出現(xiàn)了兩種截然不同的觀點,一個觀點認為流計算是批處理的特例,另一個觀點則認為批處理是流處理的特例。
實時計算與離線計算的分離
流計算是批處理的特例的觀點在早期占據(jù)了主導的地位,其中最為典型的便是以 Spark Streaming 為代表的 micro-batching 類型的實時處理框架的流行。Micro-batching 的主要思想是以分鐘甚至秒級別的執(zhí)行間隔來將批處理應(yīng)用到數(shù)據(jù)流上,但不久后人們意識到這種計算模型依然不能完全滿足低延遲高準確性的要求,主要問題除了批處理調(diào)度導致的延遲外,還有一點是窗口變小后,數(shù)據(jù)收集延遲對結(jié)果準確性的影響大大增強了。比如說計算一個游戲服務(wù)器每 5 min 的新登錄玩家數(shù),但因為網(wǎng)絡(luò)或者客戶端設(shè)備故障等因素,12:00 的玩家登錄日志可能在 12:10 才被收集到服務(wù)器,如果實時計算在 12:05:00 就輸出結(jié)果,必然會漏掉這條遲到的數(shù)據(jù)。在離線計算中這樣的問題并不明顯,因為一個批次的時間跨度較大且對延遲要求不高,因此計算的時間可以設(shè)置一個安全的延遲,比如 1 個小時,確保數(shù)據(jù)都已經(jīng)收集完成后再開始計算,即使有大量數(shù)據(jù)是在 1 個小時后才收集到,只需要重算結(jié)果即可。然而這樣的實踐經(jīng)驗并不能應(yīng)用于實時計算,一是引入額外的安全延遲對于很多對延遲敏感的場景不可接受,二是實時計算的重算要比批處理重算的成本高出很多。因此業(yè)界普遍是采用 結(jié)合離線和實時處理的 Lambda 架構(gòu)來應(yīng)對這個問題,其主要思想是同時運行實時和離線兩個數(shù)據(jù)處理管道,實時管道提供最近小時內(nèi)的臨時結(jié)算結(jié)果,而離線管道提供小時以前的計算結(jié)果并覆蓋掉對應(yīng)時間段的實時計算結(jié)果,查詢時將兩者的結(jié)果再進行合并產(chǎn)生最終的結(jié)果 [1] 。
實時計算與離線計算的融合
實時計算與離線計算的分離說明了用批處理模型不足以表達流計算,于是人們開始探索批處理是流計算特例的模型。2015 年 Google 發(fā)表名為 The Dataflow Model 的論文,這篇論文較為詳細地闡述了實時流計算和離線批計算的統(tǒng)一模型(出于篇幅原因不展開講,詳情請見 [2] ),而該模型基于批處理是流計算特例的觀點。The Dataflow Model 將計算分為四個要素,即 what、where、when 和 how:
- what 表示要計算什么結(jié)果,即對數(shù)據(jù)的一系列轉(zhuǎn)換操作;
- where 表示結(jié)果計算上下文,即窗口如何定義;
- when 表示何時輸出和物化計算結(jié)果;
- how 表示如何清理已經(jīng)輸出的結(jié)果。
在 what 和 where 兩點上流計算和批處理是相似的,而主要不同之處在于 when 和 how 兩點,這兩點在批處理里基本不會涉及,但在流計算里卻影響著計算結(jié)果的準確性,實際上它們分別對應(yīng)了上文所說的批處理經(jīng)驗不能應(yīng)用于實時計算的兩個問題。本文主要討論的 watermark 就是屬于 when 要素里的一種技術(shù),因而下文將主要關(guān)注 when。
在批處理中 when 是輸入數(shù)據(jù)集結(jié)束的時候,how 是以覆蓋的形式來清理之前的輸出結(jié)果,處理模式都是固定的,因此用戶并不需要考慮。舉個例子,假設(shè)要計算一個游戲每天的玩家充值金額,用離線計算時我們會考慮如何將充值金額從日志中提取出來并累加到一起,此為 what;再考慮批處理的運行時間,比如每天 00:30,所以每次計算是處理 24 小時采集到的數(shù)據(jù),此為 where;而批處理的 when 是和 where 綁定的,即 00:30 計算開始,結(jié)束后馬上輸出結(jié)果;至于 how,不同批次的批處理運行的結(jié)果是互不相干的,同一批次的運行結(jié)果會覆蓋前一次運行的結(jié)果。
然而如果游戲策劃急于知道某個活動是否有帶動玩家充值,希望看到每分鐘更新的實時數(shù)據(jù),那么上述題目改為用實時流計算去實現(xiàn),此時要考慮的東西會復(fù)雜一點。首先,我們可以依舊可以復(fù)用批處理的 what 和 where,即定義一個時間范圍為 24 小時的窗口,計算邏輯和之前一樣;在 when 方面,為了可以實時地得到最新的計算結(jié)果,我們需要定義每分鐘輸出一次最新的計算結(jié)果,直到達到 24 小時后輸出最終結(jié)果;而在 how 方面,我們每次的輸出結(jié)果只需要覆蓋之前的結(jié)果即可。然而 when 的問題并沒有這么簡單。還記得我們之前說過數(shù)據(jù)采集延遲嗎?可能一個用戶充值的時間在 16:00,但中間采集的延遲可能有 1 min,導致到達服務(wù)器卻是 16:01 分,如果基于充值記錄被處理的時間(即 processing time)來進行窗口劃分,用戶充值記錄可能會被計入錯誤的窗口,所以我們應(yīng)該以用戶充值這個時間(即 event time)發(fā)生的時間為準。這里的難點在于我們計算時并不能判斷所有 event time 窗口內(nèi)的數(shù)據(jù)被收集完,因為數(shù)據(jù)的延遲是不可預(yù)知的,這被稱為窗口完整性問題。針對窗口完整性問題,The Dataflow Model 提出了 Watermark 的解決方案。
Watermark 原理解析
Watermark 并沒有很正式的官方定義,最接近定義的是 Streaming 102 [3] 里的一段描述。
A watermark is a notion of input completeness with respect to event times. A watermark with a value of time X makes the statement: “all input data with event times less than X have been observed.” As such, watermarks act as a metric of progress when observing an unbounded data source with no known end.
簡單來說 Watermark 是一個時間戳,表示已經(jīng)收集完畢的數(shù)據(jù)的最大 event time,換句話說 event time 小于 Watermark 的數(shù)據(jù)不應(yīng)該再出現(xiàn),基于這個前提我們才有可能將 event time 窗口視為完整并輸出結(jié)果。Watermark 設(shè)計的初衷是處理 event time 和 processing time 之間的延遲問題,三者的關(guān)系可以用下圖展示:
理想的情況下數(shù)據(jù)沒有延遲,因此 processing time 是等于 event time 的,理想的 Watermark 應(yīng)該是斜率為 45 度的直線。然而在真實環(huán)境下,processing time 和 event time 之間總有不確定的延遲,表現(xiàn)出來的 Watermark 會類似圖 1 中的紅色的曲線。其中紅色曲線與理想 Watermark 的縱坐標差值稱為 processing-time lag,表示在真實世界中的數(shù)據(jù)延遲,而橫坐標的差值表示 event-time skew,表示該延遲帶來的 event-time 落后量。
Watermark 通常是基于已經(jīng)觀察到的數(shù)據(jù)的 event time 來判斷(當然也可以引入 processing time 或者其他外部參數(shù)),具體需要用戶根據(jù)數(shù)據(jù)流的 event time 特征來決定,比如最簡單的算法就是取目前為止觀察到的最大 event time。在數(shù)據(jù)流真實 event time 曲線是單調(diào)非減的情況下,比如 event time 是 Kafka producer timestamp 時,我們是可以計算出完美符合實際的 Watermark 的,然而絕大多數(shù)情況下數(shù)據(jù)流的 event time 都是亂序的,因此計算完美的 Watermark 是不現(xiàn)實的(實際上也是沒有必要的),通常我們會以啟發(fā)性的 Watermark 算法來代替。
啟發(fā)性的 Watermark 算法目的在于在計算結(jié)果的延遲和準確性之間找到平衡點。如果采用激進的 Watermark 算法,那么 Watermark 會快于真實的 event time,導致在窗口數(shù)據(jù)還不完整的情況下過早輸?shù)爻鲇嬎憬Y(jié)果,影響數(shù)據(jù)的準確性;如果采用保守的 Watermark 算法,那么 Watermark 會落后于真實的 event time,導致窗口數(shù)據(jù)收集完整后不能及時輸出計算結(jié)果,造成數(shù)據(jù)的延遲。實際上上文所說的 Watermark 取觀察到的最大 event time 和批處理使用的設(shè)置一個足夠大的安全延遲的辦法分別就屬于 Watermark 算法的兩個極端。很多情況下用戶偏向于犧牲一定的延時來換取準確性,不過在像金融行業(yè)的欺詐檢測場景中,低延遲是首要的,否則準確性再高也沒有意義。針對這種情況 The Dataflow Model 提供了 allow lateness 的機制,工作的原理是用戶可以設(shè)置一個時間閾值,如果在計算結(jié)果輸出后的這個閾值時間內(nèi)發(fā)現(xiàn)遲到的數(shù)據(jù),計算結(jié)果會被重新計算和輸出,但如果超出這個閾值的遲到數(shù)據(jù)就會被丟棄。
這時你們可以看到要開發(fā)一個高質(zhì)量的實時作業(yè)是多么不易了,這也是很多實時應(yīng)用開發(fā)者最為頭疼的地方,或許以后利用機器學習去計算 Watermark 是個不錯的主意(然后我們的工作就可以愉快地從調(diào) Watermark 算法參數(shù)變?yōu)檎{(diào)機器學習模型參數(shù)了 :) )。
Watermark 實踐
接下來我們將結(jié)合工業(yè)生產(chǎn)的案例來說明實戰(zhàn)中 Watermark 是如何影響流計算的。Watermark 在不同計算引擎的實現(xiàn)并不相同,本文將以筆者使用最多的 Apache Flink (下文簡稱 Flink)作為例子來說明。
對于游戲行業(yè)來說,游戲的日活躍玩家數(shù)是個很常見的指標,游戲策劃或者運營通常可以根據(jù)日活躍玩家數(shù)的變動來實時地監(jiān)控某個活動是否收到玩家歡迎的程度,但是游戲可能有海外服務(wù)器,數(shù)據(jù)收集的延遲可能差別較大,造成數(shù)據(jù)流 event time 亂序比較嚴重,在這種情況下設(shè)計 Watermark 算法是個比較大的挑戰(zhàn)。
假設(shè)我們有 A、B、C 共 3 臺服務(wù)器,其中 A、B 為國內(nèi)服務(wù)器,延遲較低且穩(wěn)定,而 C 為海外服務(wù)器,延遲較高且不穩(wěn)定,而我們需要計算每分鐘內(nèi)的登錄玩家數(shù)。
我們現(xiàn)在面臨兩種可能帶來 event time 亂序的因素:一是不同服務(wù)器間的延遲不同,比如可能先收到服務(wù)器 A 在 t2 的數(shù)據(jù),再收到服務(wù) C 在 t1 的數(shù)據(jù);二是同一服務(wù)器的不同數(shù)據(jù)的延遲不同,比如可能先收到服務(wù)器 C t2 的數(shù)據(jù)再收到 t1 的數(shù)據(jù)。針對第二種因素,我們可以對不同服務(wù)器的數(shù)據(jù)分別計算 Watermark,再取其中的最小值作為 Watermark,而針對第一種因素,我們則需要設(shè)計出針對單個服務(wù)器數(shù)據(jù)流的合理 Watermark 算法。
在算法實現(xiàn)上,F(xiàn)link 提供兩種觸發(fā) Watermark 更新的方法,即在收到特殊的消息時觸發(fā)或者定時觸發(fā),我們這里將選用定時觸發(fā)的方法。因為窗口是一分鐘比較小,我們這里將定時的間隔設(shè)為 5 秒,也就是說 Watermark 大約落后真實 Watermark 5 秒,然后這 5 秒內(nèi) Watermark 是不會提升的,所以可以容忍局部的 processing lag。
我們試著取目前為止觀察到的最大時間戳作為 Watermark,那么 Watermark 的效果如下(為了在消費端更加直觀,我們將坐標系調(diào)轉(zhuǎn),現(xiàn)在 x 軸表示 processing time)。
其中 t0-t3 分別表示 Watermark 提升的時間點,黃虛線表示在一個 Watermark 周期內(nèi)的最大 event time,紅線表示 Watermark。可以看到在 t0-t1 的 Watermark 周期內(nèi)出現(xiàn)了輕微的 event time 亂序,但是并不影響計算的準確性。接下來在 t1-t2 和 t2-t3 兩個周期間也發(fā)生了相似的亂序,但是這個亂序并不在同一個 Watermark 周期,因此導致正常延遲的數(shù)據(jù)被誤認為是遲到數(shù)據(jù)。解決方法是引入一定可容忍的 event time skew,比如說最簡單的設(shè)置一個 skew 閾值,即每次計算 Watermark 的結(jié)果都減去這個值。根據(jù)數(shù)據(jù)流延遲的不同,我們還可以給不同服務(wù)器設(shè)置不同的 skew 閾值。
上述 Watermark 算法代碼如下:
public class WatermarkProcessor implements AssignerWithPeriodicWatermarks<UserLogin> {
private static final long ALLOWED_EVENT_TIME_SKEW = 1000L;
private static final Map<String, Long> maxTimestampPerServer = new HashMap<>(3);
@Nullable
public Watermark getCurrentWatermark() {
Optional<Long> maxTimestamp = maxTimestampPerServer.values().stream()
.min(Comparator.comparingLong(Long::valueOf));
if (maxTimestamp.isPresent()) {
return new Watermark(maxTimestamp.get() - ALLOWED_EVENT_TIME_SKEW);
} else{
return null;
}
}
public long extractTimestamp(UserLogin userLogin, long previousElementTimestamp) {
String server = userLogin.getServer();
long eventTime = userLogin.getEventTime();
if (!maxTimestampPerServer.containsKey(server) ||
userLogin.getEventTime() > maxTimestampPerServer.get(server)) {
maxTimestampPerServer.put(server, eventTime);
}
return eventTime;
}
}
總結(jié)
流計算和批處理誰是表達能力更強的計算模式,這個問題或許還將繼續(xù)被爭論下去,不過根據(jù) The Dataflow Model 我們已經(jīng)有足夠的理論支撐來開發(fā)低延遲高準確并且可容錯的流計算應(yīng)用。其中流計算的準確性很大程度上決定于數(shù)據(jù)流時間的亂序程度,因此我們在開發(fā)實時流計算應(yīng)用時,比起開發(fā)離線批處理應(yīng)用,很大的一個不同是要考慮數(shù)據(jù)是以什么順序到達,并針對性地設(shè)計 Watermark 算法來處理數(shù)據(jù)流時間的亂序。Watermark 算法需要平衡低延遲和高準確性兩者,在引入最低延遲成本的情況下準確判斷窗口的計算和輸出結(jié)果的時機,通常可以從 processing lag 和 event time skew 兩者的容忍閾值入手。






