![]()
目錄
1. Shopee 數(shù)據(jù)系統(tǒng)建設(shè)中面臨的典型問題
2. 為什么選擇 Hudi
3. Shopee 在 Hudi 落地過程中的實(shí)踐
4. 社區(qū)貢獻(xiàn)
5. 總結(jié)與展望
湖倉一體(LakeHouse)作為大數(shù)據(jù)領(lǐng)域的重要發(fā)展方向,提供了流批一體和湖倉結(jié)合的新場景。目前,企業(yè)許多業(yè)務(wù)中會遇到的數(shù)據(jù)及時性、準(zhǔn)確性,以及存儲的成本等問題,都可以通過湖倉一體方案得到解決。 當(dāng)下,幾個主流的湖倉一體開源方案都在不斷迭代開發(fā)中,業(yè)界的應(yīng)用也都是在摸索中前行,在實(shí)際的使用中難免會遇到一些不夠完善的地方和未支持的特性。Shopee 內(nèi)部在使用過程中基于開源的 Apache Hudi 定制了自己的版本,以實(shí)現(xiàn)企業(yè)級的應(yīng)用和一些內(nèi)部業(yè)務(wù)需求的新特性。 通過引入 Hudi 的 Data lake 方案,Shopee 的 Data Mart、推薦、ShopeeVideo 等產(chǎn)品的數(shù)據(jù)處理流程實(shí)現(xiàn)了流批一體、增量處理的特性,很大程度上簡化了這一流程,并提升了性能。
1. Shopee 數(shù)據(jù)系統(tǒng)建設(shè)中面臨的典型問題 1.1 Shopee 數(shù)據(jù)系統(tǒng)簡介
上圖是 Shopee Data Infrastructure 團(tuán)隊(duì)為公司內(nèi)部業(yè)務(wù)方提供的一套整體解決方案。
-
第一步是數(shù)據(jù)集成(Data Integration),目前我們提供了基于日志數(shù)據(jù)、數(shù)據(jù)庫和業(yè)務(wù)事件流的數(shù)據(jù)集成方式;
-
然后通過平臺的 ETL(Extract Transform Load)服務(wù) load 到業(yè)務(wù)的數(shù)倉中,業(yè)務(wù)同學(xué)通過我們提供的開發(fā)平臺和計(jì)算服務(wù)進(jìn)行數(shù)據(jù)處理;
-
最后的結(jié)果數(shù)據(jù)通過 Dashboard 進(jìn)行展示,使用即時查詢引擎進(jìn)行數(shù)據(jù)探索,或者通過數(shù)據(jù)服務(wù)反饋到業(yè)務(wù)系統(tǒng)中。
下面先來分析一下 Shopee 數(shù)據(jù)系統(tǒng)建設(shè)中遇到的三個典型問題。
1.2 流批一體的數(shù)據(jù)集成 ![]()
第一個問題:在基于數(shù)據(jù)庫的數(shù)據(jù)集成過程中,存在同一份數(shù)據(jù)同時面臨流處理和批處理的需求。傳統(tǒng)的做法是實(shí)現(xiàn)全量導(dǎo)出和 CDC 兩條鏈路。全量導(dǎo)出鏈路滿足批處理的需求,CDC 鏈路用于實(shí)時處理和增量處理的場景。
然而,這種做法存在的一個問題是全量導(dǎo)出效率低,導(dǎo)致數(shù)據(jù)庫負(fù)載高。另外,數(shù)據(jù)一致性也難以得到保證。
同時,在批數(shù)據(jù)集構(gòu)建上有一定的存儲效率優(yōu)化,所以我們希望基于 CDC 數(shù)據(jù)去構(gòu)建批數(shù)據(jù)集,以此同時滿足三種處理場景的需求,提高數(shù)據(jù)時效性。
1.3 狀態(tài)表明細(xì)存儲 ![]()
第二個問題是狀態(tài)表明細(xì)的存儲。我們可以認(rèn)為,傳統(tǒng)批數(shù)據(jù)集是在某一時間點(diǎn)對業(yè)務(wù)數(shù)據(jù)整體狀態(tài)的一個快照,壓縮到一個點(diǎn)的快照會合并掉業(yè)務(wù)流程中的過程信息。這些變化過程反映了用戶使用我們服務(wù)的過程,是非常重要的分析對象。一旦被合并掉,將無法展開。
另外,在很多場景下,業(yè)務(wù)數(shù)據(jù)每天變化的部分只占全量數(shù)據(jù)的一小部分,每個批次都全量存儲會帶來很大的資源浪費(fèi)。
1.4 大寬表創(chuàng)建 ![]()
第三個問題是大寬表的創(chuàng)建。近實(shí)時寬表構(gòu)建是數(shù)據(jù)處理中常見的一種場景,它存在的問題是傳統(tǒng)的批處理延遲過高,使用流式計(jì)算引擎資源浪費(fèi)嚴(yán)重。因此,我們基于多個數(shù)據(jù)集合構(gòu)建了業(yè)務(wù)寬表,支持 Ad hoc 類 OLAP 查詢。
2. 為什么選擇 Hudi
針對上述業(yè)務(wù)中遇到的問題,基于以下三點(diǎn)考量,最終我們選擇 Apache Hudi 來作為解決方案。
2.1 生態(tài)支持豐富 ![]()
我們期望使用純流式的方式建設(shè)數(shù)據(jù)集成環(huán)境,而 Hudi 對流式場景有著良好的支持。
第二點(diǎn)是對各個大數(shù)據(jù)生態(tài)的兼容。我們構(gòu)建的數(shù)據(jù)集將會同時存在批處理、流處理、增量處理和動態(tài)探索等多種需求的負(fù)載。目前這些工作負(fù)載運(yùn)行在各種計(jì)算引擎中,所以,對多種計(jì)算引擎的支持也在我們的考慮范圍之內(nèi)。
另一個考量點(diǎn)則是和 Shopee 業(yè)務(wù)需求的契合。當(dāng)前,我們亟待處理的數(shù)據(jù)集大部分來源于業(yè)務(wù)系統(tǒng),都帶有唯一性標(biāo)識信息,所以 Hudi 的設(shè)計(jì)更加符合我們的數(shù)據(jù)特性。
2.2 插件化的能力 ![]()
目前我們平臺提供 Flink 和 Spark 作為通用計(jì)算引擎,作為數(shù)據(jù)集成和數(shù)倉建設(shè)負(fù)載的承載者,同時也使用 Presto 承載數(shù)據(jù)探索的功能。Hudi 對這三者都支持。
在實(shí)際使用中,根據(jù)業(yè)務(wù)數(shù)據(jù)的重要程度不同,我們也會給用戶提供不同的數(shù)據(jù)索引方式。
2.3 業(yè)務(wù)特性匹配 ![]()
在數(shù)據(jù)集成過程中,用戶的 schema 變化是一個非常常見的需要。ODS 的數(shù)據(jù)變化可能導(dǎo)致下游的計(jì)算任務(wù)出錯。同時,在增量處理時,我們需要時間處理的語義。支持主鍵數(shù)據(jù)的存儲對于我們業(yè)務(wù)數(shù)據(jù)庫的數(shù)據(jù)來說,意義重大。
3. Shopee 在 Hudi 落地過程中的實(shí)踐 3.1 實(shí)時數(shù)據(jù)集成 ![]()
目前 Shopee 內(nèi)部有大量的業(yè)務(wù)數(shù)據(jù)來自業(yè)務(wù)數(shù)據(jù)庫,我們采用類似 CDC 的技術(shù)獲取數(shù)據(jù)庫中的變更數(shù)據(jù),給業(yè)務(wù)方構(gòu)建支持批處理和近實(shí)時增量處理的 ODS 層數(shù)據(jù)。
當(dāng)一個業(yè)務(wù)方的數(shù)據(jù)需要接入時,我們會在進(jìn)行增量實(shí)時集成之前先做一次全量 Bootstrap,構(gòu)建基礎(chǔ)表,然后基于新接入的 CDC 數(shù)據(jù)進(jìn)行實(shí)時構(gòu)建。
構(gòu)建的過程中,我們一般根據(jù)用戶需求選擇構(gòu)建的 COW 表或者 MOR 表。
1)問題構(gòu)建與解決方案 ![]()
在進(jìn)行實(shí)時構(gòu)建的過程中,存在以下兩種較為常見的問題:
一種是用戶將有大量變更的數(shù)據(jù)集的類型配置為 COW 表,導(dǎo)致數(shù)據(jù)寫放大。此時我們需要做的事情是建立相應(yīng)的監(jiān)控來識別這種配置。同時,我們基于 MOR 表的配置化數(shù)據(jù)合并邏輯,支持?jǐn)?shù)據(jù)文件的同步或者異步更新。
第二個問題是默認(rèn)的 Bloom filter 導(dǎo)致數(shù)據(jù)存在性判斷的問題。這里比較好的方式是采用 HBase Index 解決超大數(shù)據(jù)集的寫入問題。
2)問題解決的效果 ![]()
這是將我們的某些數(shù)據(jù)集成鏈路換成基于 Hudi 的實(shí)時集成后的效果。上圖是數(shù)據(jù)可見性占比與時延的關(guān)系,目前我們能保證 80% 的數(shù)據(jù)在 10 分鐘內(nèi)可見可用,所有的數(shù)據(jù) 15 分鐘內(nèi)可見可用。
下圖是我們統(tǒng)計(jì)的資源消耗占比圖。藍(lán)色部分是實(shí)時鏈路的資源消耗,紅色是歷史的按批數(shù)據(jù)集成的資源消耗。
因?yàn)榍袚Q成了實(shí)時鏈路,對于一些大表重復(fù)率低的數(shù)據(jù)減少了重復(fù)處理,同時也減少了集中式處理效率降低導(dǎo)致的資源消耗。因此,我們的資源消耗遠(yuǎn)低于批處理方式。
3.2 增量視圖 ![]()
針對用戶需要狀態(tài)明細(xì)的場景,我們提供了基于 Hudi Savepoint 功能的服務(wù),按照用戶需要的時間周期,定期構(gòu)建快照(Snapshot),這些快照以分區(qū)的形式存在元數(shù)據(jù)管理系統(tǒng)中。
用戶可以方便地在 Flink、Spark,或者 Presto 中利用 SQL 去使用這些數(shù)據(jù)。因?yàn)閿?shù)據(jù)存儲是完整且沒有合并的明細(xì),所以數(shù)據(jù)本身支持全量計(jì)算,也支持增量處理。
![]()
在使用增量視圖的存儲時,對于一些變化數(shù)據(jù)占比不大的場景,會取得比較好的存儲節(jié)省效果。
這里有一則簡單的公式,用于計(jì)算空間使用率:(1 + (t - 1) * p ) / t。
其中,P 表示變化數(shù)據(jù)的占比,t 表示需要保存的時間周期數(shù)。變化數(shù)據(jù)占比越低,所帶來的存儲節(jié)省越好。對于長周期數(shù)據(jù),也會有一個比較好的節(jié)省效果。
同時,這種方式對增量計(jì)算的資源節(jié)省效果也比較好。缺點(diǎn)是按批全量計(jì)算會有一定的讀放大的問題。
3.3 增量計(jì)算
當(dāng)我們的數(shù)據(jù)集基于 Hudi MOR 表來構(gòu)建時,就可以同時支持批處理、增量處理和近實(shí)時處理負(fù)載。
![]()
以圖為例,Table A 是一個增量的 MOR 表,當(dāng)我們基于 Table A 來構(gòu)建后續(xù)的表 B 和表 C 時,如果計(jì)算邏輯都支持增量的構(gòu)建,那我們在計(jì)算的過程中,只需要獲取新增的數(shù)據(jù)和變化的數(shù)據(jù)。這樣在計(jì)算的過程中就顯著減少了參與計(jì)算的數(shù)據(jù)量。
![]()
這里是離線計(jì)算平臺基于 Hudi 的增量計(jì)算來構(gòu)建的一個近實(shí)時的用戶作業(yè)分析。當(dāng)用戶提交一個 Spark 任務(wù)到集群運(yùn)行,任務(wù)結(jié)束后會自動收集用戶的日志,并從中提取相關(guān)的 Metric 和關(guān)鍵日志寫入到 Hudi 表。然后一個處理任務(wù)增量讀取這些日志,分析出任務(wù)的優(yōu)化項(xiàng),以供用戶參考。
當(dāng)一個用戶作業(yè)運(yùn)行完后,一分鐘之內(nèi)就可以分析出用戶的作業(yè)情況,并形成分析報告提供給用戶。
增量 Join ![]()
![]()
除了增量計(jì)算,增量的 Join 也是一個非常重要的應(yīng)用場景。
相對于傳統(tǒng)的 Join,增量計(jì)算只需要根據(jù)增量數(shù)據(jù)查找到需要讀取的數(shù)據(jù)文件,進(jìn)行讀取,并分析出需要重寫的分區(qū),重新寫入。
相對于全量來說,增量計(jì)算顯著減少了參與計(jì)算的數(shù)據(jù)量。
merge Into
Merge Into 是在 Hudi 中非常實(shí)用的一個用于構(gòu)建實(shí)時寬表的技術(shù),它主要基于 partial update 來實(shí)現(xiàn)。
MERGE INTO target_table t0
USING SOURCE TABLE s0
ON t0.id = s0.id
WHEN matched THEN UPDATE SET
t0.price = s0.price+5,
_ts = s0.ts;MERGE INTO target_table_name [target_alias]
USING source_table_reference [source_alias]
ON merge_condition
[ WHEN MATCHED [ AND condition ] THEN matched_action ] [...]
[ WHEN NOT MATCHED [ AND condition ] THEN not_matched_action ] [...]
matched_action
{ DELETE |
UPDATE SET * |
UPDATE SET { column1 = value1 } [, ...] }
not_matched_action
{ INSERT * |
INSERT (column1 [, ...] ) VALUES (value1 [, ...])
這里展示了基于 Spark SQL 的 Merge Into 語法,它讓用戶構(gòu)建寬表的作業(yè)開發(fā)變得非常簡單。
基于 Merge Into 的增量 Join 實(shí)現(xiàn) ![]()
Hudi 的實(shí)現(xiàn)是采用 Payload 的方式,在一個 Payload 中可以只存在一張表的部分列。
增量數(shù)據(jù)的 Payload 被寫入到 log 文件中,然后在后續(xù)的合并中生成用戶使用的寬表。因?yàn)楹罄m(xù)合并存在時間延遲,所以我們優(yōu)化了合并的寫入邏輯。
在數(shù)據(jù)合并完成后,我們會在元數(shù)據(jù)管理中寫入一個合并的數(shù)據(jù)時間和相關(guān)的 DML,然后在讀取這張 MOR 表的過程中分析 DML 和時間,為數(shù)據(jù)可見性提供保障。
![]()
而采用 Partial Update 的好處是:
-
顯著降低了流式構(gòu)建大寬表的資源使用;
-
文件級別的數(shù)據(jù)修改時,處理效率增高。
4. 社區(qū)貢獻(xiàn)
在解決處理 Shopee 內(nèi)部業(yè)務(wù)問題的同時,我們也貢獻(xiàn)了一批代碼到社區(qū),將內(nèi)部的優(yōu)化和新特性分享出來,比較大的 feature 有 Meta sync(RFC-55 已完成)、snapshot view(RFC-61)、partial update(HUDI-3304)、FileSystemLocker(HUDI-4065 已完成)等等;同時也幫助社區(qū)修復(fù)了很多 bug。后續(xù)也希望能夠用這種方式,更好地滿足業(yè)務(wù)需求的同時,參與社區(qū)共建。
4.1 snapshot View
增量視圖(snapshot view)有以下幾個典型應(yīng)用場景:
-
每天在基礎(chǔ)表上生成名稱為
compacted-YYYYMMDD的快照,用戶使用快照表生成每日的衍生數(shù)據(jù)表,并計(jì)算報表數(shù)據(jù)。當(dāng)用戶下游的計(jì)算邏輯發(fā)生變化時,能夠選擇對應(yīng)快照進(jìn)行重新計(jì)算。還可以設(shè)置留存期為 X 天,每天清理掉過期數(shù)據(jù)。這里其實(shí)也可以在多快照的數(shù)據(jù)上自然地實(shí)現(xiàn) SCD-2。 -
一個命名為
yyyy-archived的存檔分支可以每年在數(shù)據(jù)進(jìn)行壓縮和優(yōu)化之后生成,如果我們的保存策略有變化(例如要刪除敏感信息),那么可以在進(jìn)行相關(guān)的操作之后,在這個分支上生成一個新的快照。 -
一個命名為
preprod-xx的快照可以在進(jìn)行了必要的質(zhì)量檢查之后再正式發(fā)布給用戶,避免外部工具與 pipeline 本身的耦合。
對于 snapshot view 的需求,Hudi 已經(jīng)可以在一定程度上通過兩個關(guān)鍵特性來做支持:
-
Time travel:用戶可以提供一個時間點(diǎn)來查詢對應(yīng)時間上的 Hudi 表快照數(shù)據(jù)。
-
Savepoint:可以保證某個 commit 時間點(diǎn)的快照數(shù)據(jù)不會被清理,而在 savepoint 之外的中間數(shù)據(jù)仍然可以被清理。
簡單的實(shí)現(xiàn)如下圖所示:
![]()
但是在實(shí)際的業(yè)務(wù)場景中,為了滿足用戶的 snapshot view 需求,還需要從易用性和可用性上考慮更多。
例如,用戶如何得知一個 snapshot 已經(jīng)正確地發(fā)布出來了?這其中包含的一個問題是可見性,也就是說,用戶應(yīng)該可以在整個 pipeline 中顯式地拿到 snapshot 表,這里就需要提供類似 Git 的 tag 功能,增強(qiáng)易用性。
另外,在打快照的場景中,一個常見的需求是數(shù)據(jù)的精準(zhǔn)切分。一個例子就是用戶其實(shí)不希望 event time 在 1 號的數(shù)據(jù)漂移到 2 號的快照之中,更希望的做法是在每個 FileGroup 下結(jié)合 watermark 做精細(xì)的 instant 切分。
為了更好地滿足生產(chǎn)環(huán)境中的需求,我們實(shí)現(xiàn)了以下優(yōu)化:
-
擴(kuò)展了 savepoint metadata,在此基礎(chǔ)上實(shí)現(xiàn)快照的 tag、branch 以及 lifecycle 管理,和自動的 meta 同步功能;
-
在 MergeOnRead 表上實(shí)現(xiàn)精細(xì)化的 ro 表 base file 切分,在 compaction 的時候通過 watermark 切分日志文件,保證 snapshot 的精確性。也就是說,我們可以在流式寫入的基礎(chǔ)上,給下游的離線處理提供精確到 0 點(diǎn)的數(shù)據(jù)。
目前我們正在將整體功能通過 RFC-61 貢獻(xiàn)回社區(qū),實(shí)際落地過程的收益前面章節(jié)已有介紹,這里不再贅述。
4.2 多源 Partial update
前文簡單介紹了多源部分列更新(大寬表拼接)的場景,我們依賴 Hudi 的多源合并能力在存儲層實(shí)現(xiàn) Join 的操作,大大降低了計(jì)算層在 state 和 shuffle 上的壓力。
目前,我們主要是通過 Hudi 內(nèi)部的 Payload 接口實(shí)現(xiàn)多源的部分列更新。下面這張圖展示了 Payload 在 Hudi 的寫端和讀端的交互流程。
![]()
實(shí)現(xiàn)的原理基本上就是通過自定義的 Payload class 來實(shí)現(xiàn)相同 key 不同源數(shù)據(jù)的合并邏輯,寫端會在批次內(nèi)做多源的合并并寫入 log,讀端在讀時合并時也會調(diào)用相同的邏輯來處理跨批次的情況。
這里需要注意的是亂序和遲到數(shù)據(jù)(out-of-order and late events)的問題。如果不做處理,在下游經(jīng)常會導(dǎo)致舊數(shù)據(jù)覆蓋新數(shù)據(jù),或者列更新不完整的情況。
針對亂序和遲到數(shù)據(jù),我們對 Hudi 做了 Multiple ordering value 的增強(qiáng),保證每個源只能更新屬于自己那部分列的數(shù)據(jù),并且可以根據(jù)設(shè)置的 event time (ordering value)列,確保只會讓新數(shù)據(jù)覆蓋舊數(shù)據(jù)。
后續(xù)我們還準(zhǔn)備結(jié)合 lock less multiple writers 來實(shí)現(xiàn)多 Job 多源的并發(fā)寫入。
5. 總結(jié)與展望
針對在 Shopee 數(shù)據(jù)系統(tǒng)建設(shè)中面臨的問題,我們提出了湖倉一體的解決方案,通過對比選型選擇了 Hudi 作為核心組件。
在落地過程中,我們通過使用 Hudi 的核心特性以及在此之上的擴(kuò)展改造,分別滿足了三個主要的用戶需求場景:實(shí)時數(shù)據(jù)集成、增量視圖和增量計(jì)算。并為用戶帶來了低延時(約 10 分鐘)、降低計(jì)算資源消耗、降低存儲消耗等收益。
接下來,我們還將提供更多特性,并針對以下兩個方面做進(jìn)一步完善,從而滿足用戶更多的場景,提供更好的性能。
5.1 跨任務(wù)并發(fā)寫支持 ![]()
當(dāng)前 Hudi 支持了基于文件鎖的單個任務(wù)單 writer 的寫入方式。
但是在實(shí)際中,有一些場景需要多個任務(wù)多 writer 同時寫入,且寫入分區(qū)有交叉,目前的 OCC 對這種情況支持不佳。目前我們正在與社區(qū)合作解決 Flink 與 Spark 多重 writer 的場景。
5.2 性能優(yōu)化 ![]()
元數(shù)據(jù)讀取以及 File listing 操作無論是在寫入端還是讀取端都會有很大的性能消耗,海量的分區(qū)對外部元數(shù)據(jù)系統(tǒng)(比如 HMS)也會造成很大壓力。
針對這一問題,我們計(jì)劃第一步將 schema 之外的信息存儲從 HMS 過渡到 MDT;第二步是在未來使用一個獨(dú)立的 MetaStore 和 Table service 的 server,不再強(qiáng)耦合于 HDFS。
在這個 server 中,我們可以更容易地優(yōu)化讀取性能,更靈活地進(jìn)行資源調(diào)整。






