1. 實時需求日趨迫切
目前各大公司的產(chǎn)品需求和內(nèi)部決策對于數(shù)據(jù)實時性的要求越來越迫切, ?需要實時數(shù)倉的能?來賦能 。傳統(tǒng)離 線數(shù)倉的數(shù)據(jù)時效性是 T+1,調(diào)度頻率以天為單位,?法?撐實時場景的數(shù)據(jù)需求 。即使能將調(diào)度頻率設(shè)置成 ?時,也只能解決部分時效性要求不高的場景,對于實效性要求很高的場景還是?法優(yōu)雅的?撐 。因此實時使 用數(shù)據(jù)的問題必須得到有效解決。
2. 實時技術(shù)日趨成熟
實時計算框架已經(jīng)經(jīng)歷了三代發(fā)展,分別是:Storm 、SparkStreaming 、Flink,計算框架越來越成熟。
???, ?實時任務(wù)的開發(fā)已經(jīng)能通過編寫 SQL 的?式來完成,在技術(shù)層?能很好地繼承離線數(shù)倉的架構(gòu)設(shè)計 思想;
另??? ,在線數(shù)據(jù)開發(fā)平臺所提供的功能對實時任務(wù)開發(fā) 、調(diào)試 、運維的?持也?漸趨于成熟, ?開發(fā)成本逐 步降低,有助于去做這件事。
二、實時數(shù)倉建設(shè)目的
1. 解決傳統(tǒng)數(shù)倉的問題
從目前數(shù)倉建設(shè)的現(xiàn)狀來看, ?實時數(shù)倉是?個容易讓?產(chǎn)生混淆的概念,根據(jù)傳統(tǒng)經(jīng)驗分析,數(shù)倉有?個重要 的功能, ?即能夠記錄歷史 。通常,數(shù)倉都是希望從業(yè)務(wù)上線的第?天開始有數(shù)據(jù),然后?直記錄到現(xiàn)在。
但實時流處理技術(shù), ??是強調(diào)當(dāng)前處理狀態(tài)的?個技術(shù),結(jié)合當(dāng)前?線大?的建設(shè)經(jīng)驗和滴滴在該領(lǐng)域的建設(shè) 現(xiàn)狀,我們嘗試把公司內(nèi)實時數(shù)倉建設(shè)的目的定位為, ?以數(shù)倉建設(shè)理論和實時技術(shù),解決由于當(dāng)前離線數(shù)倉數(shù) 據(jù)時效性低解決不了的問題。
現(xiàn)階段我們要建設(shè)實時數(shù)倉的主要原因是:
公司業(yè)務(wù)對于數(shù)據(jù)的實時性越來越迫切, ?需要有實時數(shù)據(jù)來輔助完成決策;
實時數(shù)據(jù)建設(shè)沒有規(guī)范,數(shù)據(jù)可用性較差,?法形成數(shù)倉體系, ?資源大量浪費;
數(shù)據(jù)平臺?具對整體實時開發(fā)的?持也?漸趨于成熟, ?開發(fā)成本降低。
2. 實時數(shù)倉的應(yīng)用場景
實時 OLAP 分析;
實時數(shù)據(jù)看板;
實時業(yè)務(wù)監(jiān)控;
實時數(shù)據(jù)接?服務(wù)。
三、實時數(shù)倉建設(shè)方案
接下來我們分析下目前實時數(shù)倉建設(shè)比較好的?個案例,希望這些案例能夠給?家?guī)?些啟發(fā)。
1. 滴滴順風(fēng)車實時數(shù)倉案例
滴滴數(shù)據(jù)團隊建設(shè)的實時數(shù)倉,基本滿足了順風(fēng)車業(yè)務(wù)方在實時側(cè)的各類業(yè)務(wù)需求,初步建立起順風(fēng)車實時數(shù) 倉,完成了整體數(shù)據(jù)分層, ?包含明細(xì)數(shù)據(jù)和匯總數(shù)據(jù),統(tǒng)?了 DWD 層, ?降低了?數(shù)據(jù)資源消耗,提高了數(shù)據(jù)
復(fù)用性,可對外輸出豐富的數(shù)據(jù)服務(wù)。
數(shù)倉具體架構(gòu)如下圖所示:

從數(shù)據(jù)架構(gòu)圖來看,順風(fēng)車實時數(shù)倉和對應(yīng)的離線數(shù)倉有很多類似的地方 。例如分層結(jié)構(gòu);?比如 ODS 層, ?明 細(xì)層,匯總層, ?乃至應(yīng)用層,他們命名的模式可能都是?樣的 。但仔細(xì)比較不難發(fā)現(xiàn),兩者有很多區(qū)別:
與離線數(shù)倉相比, ?實時數(shù)倉的層次更少?些
從目前建設(shè)離線數(shù)倉的經(jīng)驗來看,數(shù)倉的數(shù)據(jù)明細(xì)層內(nèi)容會非常豐富,處理明細(xì)數(shù)據(jù)外?般還會包含輕度 匯總層的概念, ?另外離線數(shù)倉中應(yīng)用層數(shù)據(jù)在數(shù)倉內(nèi)部,但實時數(shù)倉中,app 應(yīng)用層數(shù)據(jù)已經(jīng)落?應(yīng)用系 統(tǒng)的存儲介質(zhì)中,可以把該層與數(shù)倉的表分離;
應(yīng)用層少建設(shè)的好處:實時處理數(shù)據(jù)的時候,每建?個層次,數(shù)據(jù)必然會產(chǎn)生?定的延遲;
匯總層少建的好處:在匯總統(tǒng)計的時候,往往為了容忍?部分?jǐn)?shù)據(jù)的延遲,可能會?為的制造?些延遲來 保證數(shù)據(jù)的準(zhǔn)確 。舉例,在統(tǒng)計跨天相關(guān)的訂單事件中的數(shù)據(jù)時,可能會等到 00?00?05 或者 00?00? 10
再統(tǒng)計,確保 00?00 前的數(shù)據(jù)已經(jīng)全部接受到位了,再進?統(tǒng)計 。所以,匯總層的層次太多的話,就會更 大的加重?為造成的數(shù)據(jù)延遲。
與離線數(shù)倉相比, ?實時數(shù)倉的數(shù)據(jù)源存儲不同
在建設(shè)離線數(shù)倉的時候, ? 目前滴滴內(nèi)部整個離線數(shù)倉都是建?在 Hive 表之上 。但是,在建設(shè)實時數(shù)倉的 時候, ?同?份表,會使用不同的方式進?存儲 。比如常?的情況下, ?明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都會存在
Kafka 里? ,但是像城市 、渠道等維度信息需要借助 Hbase, ?mysql 或者其他 KV 存儲等數(shù)據(jù)庫來進?存 儲。
接下來,根據(jù)順?車實時數(shù)倉架構(gòu)圖,對每?層建設(shè)做具體展開:
實時數(shù)倉分層建設(shè)
1. ODS 貼源層建設(shè)
根據(jù)順?車具體場景, ? 目前順?車數(shù)據(jù)源主要包括訂單相關(guān)的 binlog 日志, ?冒泡和安全相關(guān)的 public 日志, 流量相關(guān)的埋點日志等。
這些數(shù)據(jù)部分已采集寫? kafka 或 ddmq 等數(shù)據(jù)通道中,部分?jǐn)?shù)據(jù)需要借助內(nèi)部自研同步?具完成采集, ?最終 基于順?車數(shù)倉 ods 層建設(shè)規(guī)范分主題統(tǒng)?寫? kafka 存儲介質(zhì)中。
命名規(guī)范:ODS 層實時數(shù)據(jù)源主要包括兩種。
?種是在離線采集時已經(jīng)自動生產(chǎn)的 DDMQ 或者是 Kafkatopic, ?這類型的數(shù)據(jù)命名方式為采集系統(tǒng)自動
生成規(guī)范為:cn-binlog-數(shù)據(jù)庫名-數(shù)據(jù)庫名 eg:? cn-binlog-ihap_fangyuan-ihap_fangyuan ??種是需要自?進?采集同步到 kafkatopic 中,生產(chǎn)的 topic 命名規(guī)范同離線類似:ODS 層采 用:? realtime_ods_binlog_{源系統(tǒng)庫/表名}/ods_log_{日志名} eg:
realtime_ods_binlog_ihap_fangyuan
2. DWD 明細(xì)層建設(shè)
根據(jù)順?車業(yè)務(wù)過程作為建模驅(qū)動,基于每個具體的業(yè)務(wù)過程特點,構(gòu)建最細(xì)粒度的明細(xì)層事實表;結(jié)合順? 車分析師在離線側(cè)的數(shù)據(jù)使用特點,將明細(xì)事實表的某些重要維度屬性字段做適當(dāng)冗余,完成寬表化處理, ?之 后基于當(dāng)前順?車業(yè)務(wù)方對實時數(shù)據(jù)的需求重點, ?重點建設(shè)交易 、財務(wù) 、體驗 、安全 、流量等?大模塊;該層 的數(shù)據(jù)來源于 ODS 層,通過大數(shù)據(jù)架構(gòu)提供的 Stream SQL 完成 ETL ?作,對于 binlog 日志的處理主要進? 簡單的數(shù)據(jù)清洗 、處理數(shù)據(jù)漂移和數(shù)據(jù)亂序, ?以及可能對多個 ODS 表進? Stream Join,對于流量日志主要是 做通用的 ETL 處理和針對順?車場景的數(shù)據(jù)過濾,完成?結(jié)構(gòu)化數(shù)據(jù)的結(jié)構(gòu)化處理和數(shù)據(jù)的分流;該層的數(shù)據(jù) 除了存儲在消息隊列 Kafka 中,通常也會把數(shù)據(jù)實時寫? Druid 數(shù)據(jù)庫中,供查詢明細(xì)數(shù)據(jù)和作為簡單匯總數(shù) 據(jù)的加?數(shù)據(jù)源。
命名規(guī)范:DWD 層的表命名使用英文?寫字母, ?單詞之間用下劃線分開,總?度不能超過 40 個字符,并且應(yīng) 遵循下述規(guī)則:? realtime_dwd_{業(yè)務(wù)/pub}_{數(shù)據(jù)域縮寫}_ [{業(yè)務(wù)過程縮寫}]_ [{自定義表命名標(biāo)簽縮寫}]
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
{數(shù)據(jù)域縮寫}:參考數(shù)據(jù)域劃分部分
{自定義表命名標(biāo)簽縮寫}:實體名稱可以根據(jù)數(shù)據(jù)倉庫轉(zhuǎn)換整合后做?定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該
準(zhǔn)確表述實體所代表的業(yè)務(wù)含義
樣例:realtime_dwd_trip_trd_order_base
3. DIM 層
公共維度層,基于維度建模理念思想, ?建?整個業(yè)務(wù)過程的?致性維度, ?降低數(shù)據(jù)計算?徑和算法不統(tǒng)? ?險;
DIM 層數(shù)據(jù)來源于兩部分:?部分是 Flink 程序?qū)崟r處理 ODS 層數(shù)據(jù)得到, ?另外?部分是通過離線任務(wù) 出倉得到;
DIM 層維度數(shù)據(jù)主要使用 MySQL 、Hbase 、fusion(滴滴自研 KV 存儲) 三種存儲引擎,對于維表數(shù)據(jù)比較 少的情況可以使用 MySQL,對于單條數(shù)據(jù)大?比較? ,查詢 QPS 比較高的情況,可以使用 fusion 存
儲, ?降低機器內(nèi)存資源占用,對于數(shù)據(jù)量比較大,對維表數(shù)據(jù)變化不是特別敏感的場景,可以使用 HBase 存儲。
命名規(guī)范:DIM 層的表命名使用英??寫字母, ?單詞之間用下劃線分開,總?度不能超過 30 個字符,并且應(yīng) 遵循下述規(guī)則:? dim_{業(yè)務(wù)/pub}_{維度定義}[_{自定義命名標(biāo)簽}] :
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
{維度定義}:參考維度命名
{自定義表命名標(biāo)簽縮寫}:實體名稱可以根據(jù)數(shù)據(jù)倉庫轉(zhuǎn)換整合后做?定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該
準(zhǔn)確表述實體所代表的業(yè)務(wù)含義
樣例:dim_trip_dri_base
4. DWM 匯總層建設(shè)
在建設(shè)順?車實時數(shù)倉的匯總層的時候,跟順?車離線數(shù)倉有很多?樣的地? ,但其具體技術(shù)實現(xiàn)會存在很大 不同。
第?:?對于?些共性指標(biāo)的加?, ?比如 pv, ?uv,訂單業(yè)務(wù)過程指標(biāo)等,我們會在匯總層進?統(tǒng)?的運算,確 ?保關(guān)于指標(biāo)的?徑是統(tǒng)?在?個固定的模型中完成 。對于?些個性指標(biāo),從指標(biāo)復(fù)用性的?度出發(fā),確定唯? 的時間字段, ?同時該字段盡可能與其他指標(biāo)在時間維度上完成拉齊,例如?中異常訂單數(shù)需要與交易域指標(biāo)在 事件時間上做到拉齊。
第?:在順?車匯總層建設(shè)中, ?需要進?多維的主題匯總, ?因為實時數(shù)倉本身是?向主題的,可能每個主題會 關(guān)?的維度都不?樣,所以需要在不同的主題下, ?按照這個主題關(guān)?的維度對數(shù)據(jù)進?匯總, ?最后來算業(yè)務(wù)? 需要的匯總指標(biāo) 。在具體操作中,對于 pv 類指標(biāo)使用 Stream SQL 實現(xiàn) 1 分鐘匯總指標(biāo)作為最?匯總單位指 標(biāo),在此基礎(chǔ)上進?時間維度上的指標(biāo)累加;對于 uv 類指標(biāo)直接使用druid 數(shù)據(jù)庫作為指標(biāo)匯總?cè)萜鳎鶕?jù) 業(yè)務(wù)?對匯總指標(biāo)的及時性和準(zhǔn)確性的要求, ?實現(xiàn)相應(yīng)的精確去重和?精確去重。
第三:?匯總層建設(shè)過程中, ?還會涉及到衍生維度的加? 。在順?車券相關(guān)的匯總指標(biāo)加?中我們使用 Hbase 的版本機制來構(gòu)建?個衍生維度的拉鏈表,通過事件流和 Hbase 維表關(guān)聯(lián)的?式得到實時數(shù)據(jù)當(dāng)時的準(zhǔn)確維 度
命名規(guī)范:DWM 層的表命名使用英??寫字母, ?單詞之間用下劃線分開,總?度不能超過 40 個字符,并且 應(yīng)遵循下述規(guī)則:? realtime_dwm_{業(yè)務(wù)/pub}_{數(shù)據(jù)域縮寫}_{數(shù)據(jù)主粒度縮寫}_ [{自定義表命名標(biāo)簽縮寫}]_{統(tǒng)計時 間周期范圍縮寫} :
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
{數(shù)據(jù)域縮寫}:參考數(shù)據(jù)域劃分部分
{數(shù)據(jù)主粒度縮寫}:指數(shù)據(jù)主要粒度或數(shù)據(jù)域的縮寫,也是聯(lián)合主鍵中的主要維度
{自定義表命名標(biāo)簽縮寫}:實體名稱可以根據(jù)數(shù)據(jù)倉庫轉(zhuǎn)換整合后做?定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該 準(zhǔn)確表述實體所代表的業(yè)務(wù)含義
{統(tǒng)計時間周期范圍縮寫}:1d:天增量;td:天累計(全量);1h:小時增量;th:小時累計(全量);1min:分鐘增 量;tmin:分鐘累計(全量)
樣例:? realtime_dwm_trip_trd_pas_bus_accum_1min
5. APP 應(yīng)用層
該層主要的工作是把實時匯總數(shù)據(jù)寫?應(yīng)用系統(tǒng)的數(shù)據(jù)庫中, ?包括用于大屏顯示和實時 OLAP 的 Druid 數(shù)據(jù)庫 (該數(shù)據(jù)庫除了寫?應(yīng)用數(shù)據(jù),也可以寫?明細(xì)數(shù)據(jù)完成匯總指標(biāo)的計算)中,用于實時數(shù)據(jù)接?服務(wù)的 Hbase 數(shù)據(jù)庫,用于實時數(shù)據(jù)產(chǎn)品的 mysql 或者 redis 數(shù)據(jù)庫中。
命名規(guī)范:基于實時數(shù)倉的特殊性不做硬性要求。
順風(fēng)車實時數(shù)倉建設(shè)成果
截止目前,? 共為順風(fēng)車業(yè)務(wù)線建立了增長 、交易 、體驗 、安全 、財務(wù)五大模塊,涉及 40+ 的實時看板,涵蓋 順風(fēng)車全部核?業(yè)務(wù)過程, ?實時和離線數(shù)據(jù)誤差<0.5%, ?是順風(fēng)車業(yè)務(wù)線數(shù)據(jù)分析?面的有利補充,為順風(fēng)車 當(dāng)天發(fā)券動態(tài)策略調(diào)整, ?司乘安全相關(guān)監(jiān)控, ?實時訂單趨勢分析等提供了實時數(shù)據(jù)?持,提高了決策的時效 ? ? 性。
同時建立在數(shù)倉模型之上的實時指標(biāo)能根據(jù)用戶需求及時完成?徑變更和實時離線數(shù)據(jù)?致性校驗,大大提高 了實時指標(biāo)的開發(fā)效率和實時數(shù)據(jù)的準(zhǔn)確性,也為公司內(nèi)部大范圍建設(shè)實時數(shù)倉提供了有?的理論和實踐? ? 持。
2. 快手實時數(shù)倉場景化案例


1. 目標(biāo)
首先由于是做數(shù)倉, ?因此希望所有的實時指標(biāo)都有離線指標(biāo)去對應(yīng),要求實時指標(biāo)和離線指標(biāo)整體的數(shù)據(jù) 差異在 1% 以內(nèi), ?這是最低標(biāo)準(zhǔn)。
其次是數(shù)據(jù)延遲,其 SLA 標(biāo)準(zhǔn)是活動期間所有核?報表場景的數(shù)據(jù)延遲不能超過 5 分鐘, ?這 5 分鐘包括 作業(yè)掛掉之后和恢復(fù)時間,如果超過則意味著 SLA 不達標(biāo)。
最后是穩(wěn)定性,針對?些場景, ?比如作業(yè)重啟后,我們的曲線是正常的,不會因為作業(yè)重啟導(dǎo)致指標(biāo)產(chǎn)出 ?些明顯的異常。
2. 難點
第?個難點是數(shù)據(jù)量大 。每天整體的?口流量數(shù)據(jù)量級大概在萬億級 。在活動如春晚的場景, ?QPS 峰值能 達到億 / 秒。
第?個難點是組件依賴比較復(fù)雜 。可能這條鏈路里有的依賴于 Kafka,有的依賴 Flink, ?還有?些依賴 KV 存儲 、RPC 接口 、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常工作。
第三個難點是鏈路復(fù)雜 。目前我們有 200+ 核?業(yè)務(wù)作業(yè), ?50+ 核?數(shù)據(jù)源,整體作業(yè)超過 1000。
2) 實時數(shù)倉 - 分層模型
基于上面三個難點,來看?下數(shù)倉架構(gòu):

如上所示:
最下層有三個不同的數(shù)據(jù)源,分別是客戶端日志 、服務(wù)端日志以及 Binlog 日志;
在公共基礎(chǔ)層分為兩個不同的層次,?個是 DWD 層,做明細(xì)數(shù)據(jù), ?另?個是 DWS 層,做公共聚合數(shù)
據(jù), ?DIM 是我們常說的維度 。我們有?個基于離線數(shù)倉的主題預(yù)分層, ?這個主題預(yù)分層可能包括流量 、用 戶 、設(shè)備 、視頻的生產(chǎn)消費 、風(fēng)控 、社交等。
DWD 層的核?工作是標(biāo)準(zhǔn)化的清洗;
DWS 層是把維度的數(shù)據(jù)和 DWD 層進行關(guān)聯(lián),關(guān)聯(lián)之后生成?些通用粒度的聚合層次。
再往上是應(yīng)用層, ?包括?些大盤的數(shù)據(jù), ?多維分析的模型以及業(yè)務(wù)專題數(shù)據(jù);
最上面是場景。
整體過程可以分為三步:
第?步是做業(yè)務(wù)數(shù)據(jù)化,相當(dāng)于把業(yè)務(wù)的數(shù)據(jù)接進來;
第?步是數(shù)據(jù)資產(chǎn)化,意思是對數(shù)據(jù)做很多的清洗,然后形成?些規(guī)則有序的數(shù)據(jù);
第三步是數(shù)據(jù)業(yè)務(wù)化,可以理解數(shù)據(jù)在實時數(shù)據(jù)層面可以反哺業(yè)務(wù),為業(yè)務(wù)數(shù)據(jù)價值建設(shè)提供?些賦能。
3) 實時數(shù)倉 - 保障措施
基于上面的分層模型,來看?下整體的保障措施:

保障層面分為三個不同的部分,分別是質(zhì)量保障, ?時效保障以及穩(wěn)定保障。
我們先看藍色部分的質(zhì)量保障 。針對質(zhì)量保障,可以看到在數(shù)據(jù)源階段,做了如數(shù)據(jù)源的亂序監(jiān)控, ?這是
我們基于自?的 SDK 的采集做的, ?以及數(shù)據(jù)源和離線的?致性校準(zhǔn) 。研發(fā)階段的計算過程有三個階段, 分別是研發(fā)階段 、上線階段和服務(wù)階段。
研發(fā)階段可能會提供?個標(biāo)準(zhǔn)化的模型,基于這個模型會有?些 Benchmark,并且做離線的比對驗
證,保證質(zhì)量是?致的;
上線階段更多的是服務(wù)監(jiān)控和指標(biāo)監(jiān)控;
在服務(wù)階段,如果出現(xiàn)?些異常情況,先做 Flink 狀態(tài)拉起,如果出現(xiàn)了?些不符合預(yù)期的場景,我
們會做離線的整體數(shù)據(jù)修復(fù)。
第?個是時效性保障 。針對數(shù)據(jù)源,我們把數(shù)據(jù)源的延遲情況也納?監(jiān)控 。在研發(fā)階段其實還有兩個事
情:
首先是壓測, ?常規(guī)的任務(wù)會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務(wù)延遲的情況; ?通過壓測之后,會有?些任務(wù)上線和重啟性能評估,相當(dāng)于按照 CP 恢復(fù)之后, ?重啟的性能是什么樣
? 。
最后?個是穩(wěn)定保障, ?這在大型活動中會做得比較多, ?比如切換演練和分級保障 。我們會基于之前的壓測
結(jié)果做限流, ? 目的是保障作業(yè)在超過極限的情況下, ?仍然是穩(wěn)定的,不會出現(xiàn)很多的不穩(wěn)定或者 CP 失敗 的情況 。之后我們會有兩種不同的標(biāo)準(zhǔn),? 種是冷備雙機房, ?另外?種是熱備雙機房。
冷備雙機房是:?當(dāng)?個單機房掛掉,我們會從另?個機房去拉起;
熱備雙機房:相當(dāng)于同樣?份邏輯在兩個機房各部署?次。
以上就是我們整體的保障措施。
4) 快手場景問題及解決方案
1. PV/UV 標(biāo)準(zhǔn)化
1.1 場景
第?個問題是 PV/UV 標(biāo)準(zhǔn)化, ?這里有三個截圖:

第?張圖是春晚活動的預(yù)熱場景,相當(dāng)于是?種玩法,第二和第三張圖是春晚當(dāng)天的發(fā)紅包活動和直播間截 圖。
在活動進行過程中,我們發(fā)現(xiàn) 60~70% 的需求是計算頁面里的信息,如:
這個頁面來了多少人,或者有多少人點擊進入這個頁面;
活動?共來了多少人;
頁面里的某?個掛件,獲得了多少點擊 、產(chǎn)生了多少曝光。
1.2 方案
抽象?下這個場景就是下面這種 SQL:

簡單來說,就是從?張表做篩選條件,然后按照維度層面做聚合,接著產(chǎn)生?些 Count 或者 Sum 操作。基于這種場景,我們最開始的解決方案如上圖右邊所示。
我們用到了 Flink SQL 的 Early Fire 機制,從 Source 數(shù)據(jù)源取數(shù)據(jù), ?之后做了 DID 的分桶 。比如最開始紫色 的部分按這個做分桶,先做分桶的原因是防止某?個 DID 存在熱點的問題 。分桶之后會有?個叫做 Local ? ? ?Window Agg 的東西,相當(dāng)于數(shù)據(jù)分完桶之后把相同類型的數(shù)據(jù)相加 。Local Window Agg 之后再按照維度進 行 Global Window Agg 的合桶,合桶的概念相當(dāng)于按照維度計算出最終的結(jié)果 。Early Fire 機制相當(dāng)于在 ? ? ?Local Window Agg 開?個天級的窗? ,然后每分鐘去對外輸出?次。
這個過程中我們遇到了?些問題,如上圖左下角所示。
在代碼正常運行的情況下是沒有問題的,但如果整體數(shù)據(jù)存在延遲或者追溯歷史數(shù)據(jù)的情況, ?比如?分鐘 ? ? ?Early Fire ?次, ?因為追溯歷史的時候數(shù)據(jù)量會比較大,所以可能導(dǎo)致 14?00 追溯歷史, ?直接讀到了 14?02 的 數(shù)據(jù),而 14?01 的那個點就被丟掉了,丟掉了以后會發(fā)生什么?

在這種場景下, ?圖中上方的曲線為 Early Fire 回溯歷史數(shù)據(jù)的結(jié)果 。橫坐標(biāo)是分鐘,縱坐標(biāo)是截止到當(dāng)前時刻 的頁面 UV,我們發(fā)現(xiàn)有些點是橫著的,意味著沒有數(shù)據(jù)結(jié)果,然后?個陡增,然后?橫著的,接著??個陡 增,而這個曲線的預(yù)期結(jié)果其實是圖中下方那種平滑的曲線。
為了解決這個問題,我們用到了 Cumulate Window 的解決方案, ?這個解決方案在 Flink 1.13 版本里也有涉 及,其原理是?樣的。

數(shù)據(jù)開?個大的天級窗口,大窗口下又開了?個小的分鐘級窗口,數(shù)據(jù)按數(shù)據(jù)本身的 Row Time 落到分鐘級窗 口。
Watermark 推進過了窗口的 event_time, ?它會進行?次下發(fā)的觸發(fā),通過這種方式可以解決回溯的問 題,數(shù)據(jù)本身落在真實的窗口, ? Watermark 推進,在窗口結(jié)束后觸發(fā)。
此外, ?這種方式在?定程度上能夠解決亂序的問題 。比如它的亂序數(shù)據(jù)本身是?個不丟棄的狀態(tài),會記錄 到最新的累計數(shù)據(jù)。
最后是語義?致性, ?它會基于事件時間,在亂序不嚴(yán)重的情況下, ?和離線計算出來的結(jié)果?致性是相當(dāng)高 的。
2. DAU 計算
2.1 背景介紹
下面介紹?下 DAU 計算:

我們對于整個大盤的活躍設(shè)備 、新增設(shè)備和回流設(shè)備有比較多的監(jiān)控。
活躍設(shè)備指的是當(dāng)天來過的設(shè)備;
新增設(shè)備指的是當(dāng)天來過且歷史沒有來過的設(shè)備;
回流設(shè)備指的是當(dāng)天來過且 N 天內(nèi)沒有來過的設(shè)備。
但是我們計算過程之中可能需要 5~8 個這樣不同的 Topic 去計算這?個指標(biāo)。
我們看?下離線過程中,邏輯應(yīng)該怎么算。
首先我們先算活躍設(shè)備,把這些合并到?起,然后做?個維度下的天級別去重,接著再去關(guān)聯(lián)維度表, ?這個維 度表包括設(shè)備的首末次時間,就是截止到昨天設(shè)備首次訪問和末次訪問的時間。
得到這個信息之后,我們就可以進?邏輯計算,然后我們會發(fā)現(xiàn)新增和回流的設(shè)備其實是活躍設(shè)備里打的?個 ?標(biāo)簽 。新增設(shè)備就是做了?個邏輯處理, ?回流設(shè)備是做了 30 天的邏輯處理,基于這樣的解決方案,我們能 否簡單地寫?個 SQL 去解決這個問題?
其實我們最開始是這么做的,但遇到了?些問題:
第?個問題是:數(shù)據(jù)源是 6~8 個, ?而且我們大盤的?徑經(jīng)常會做微調(diào),如果是單作業(yè)的話,每次微調(diào)的過 程之中都要改, ?單作業(yè)的穩(wěn)定性會非常差;
第?個問題是:數(shù)據(jù)量是萬億級, ?這會導(dǎo)致兩個情況, ?首先是這個量級的單作業(yè)穩(wěn)定性非常差,其次是實 時關(guān)聯(lián)維表的時候用的 KV 存儲,任何?個這樣的 RPC 服務(wù)接? ,都不可能在萬億級數(shù)據(jù)量的場景下保證 服務(wù)穩(wěn)定性;
第三個問題是:我們對于時延要求比較高,要求時延?于?分鐘 。整個鏈路要避免批處理,如果出現(xiàn)了? 些任務(wù)性能的單點問題,我們還要保證高性能和可擴容。
2.2 技術(shù)方案
針對以上問題,介紹?下我們是怎么做的:

如上圖的例? ,第?步是對 A B C 這三個數(shù)據(jù)源,先按照維度和 DID 做分鐘級別去重,分別去重之后得到三個 分鐘級別去重的數(shù)據(jù)源,接著把它們 Union 到?起,然后再進行同樣的邏輯操作。
這相當(dāng)于我們數(shù)據(jù)源的??從萬億變到了百億的級別,分鐘級別去重之后再進行?個天級別的去重,產(chǎn)生的數(shù) 據(jù)源就可以從百億變成了??億的級別。
在??億級別數(shù)據(jù)量的情況下, ?我們再去關(guān)聯(lián)數(shù)據(jù)服務(wù)化, ?這就是?種比較可行的狀態(tài),相當(dāng)于去關(guān)聯(lián)用戶畫 像的 RPC 接? ,得到 RPC 接?之后, ?最終寫?到了目標(biāo) Topic。這個目標(biāo) Topic 會導(dǎo)?到 OLAP 引擎,供給
多個不同的服務(wù), ?包括移動版服務(wù),大屏服務(wù),指標(biāo)看板服務(wù)等。
這個?案有三個?面的優(yōu)勢,分別是穩(wěn)定性 、時效性和準(zhǔn)確性。
首先是穩(wěn)定性 。松耦合可以簡單理解為當(dāng)數(shù)據(jù)源 A 的邏輯和數(shù)據(jù)源 B 的邏輯需要修改時,可以單獨修改。
第?是任務(wù)可擴容, ?因為我們把所有邏輯拆分得非常細(xì)粒度, ?當(dāng)?些地?出現(xiàn)了如流量問題,不會影響后 面的部分,所以它擴容比較簡單, ?除此之外還有服務(wù)化后置和狀態(tài)可控。
其次是時效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合。最后是準(zhǔn)確性,我們?持?jǐn)?shù)據(jù)驗證 、實時監(jiān)控 、模型出?統(tǒng)?等。
此時我們遇到了另外?個問題 - 亂序 。對于上?三個不同的作業(yè), ?每?個作業(yè)重啟至少會有兩分鐘左右的延 遲,延遲會導(dǎo)致下游的數(shù)據(jù)源 Union 到?起就會有亂序。
2.3 延遲計算?案
遇到上面這種有亂序的情況下, ?我們要怎么處理?

我們總共有三種處理方案:
第?種解決方案是用“did + 維度 + 分鐘”進行去重,Value 設(shè)為“是否來過”。比如同?個 did, ?04:01 來
了?條, ?它會進行結(jié)果輸出 。同樣的, ?04:02 和 04?04 也會進行結(jié)果輸出 。但如果 04:01 再來, ?它就會 丟棄,但如果 04?00 來,依舊會進行結(jié)果輸出。
這個解決方案存在?些問題, ?因為我們按分鐘存,存 20 分鐘的狀態(tài)大?是存 10 分鐘的兩倍,到后面這個 狀態(tài)大?有點不太可控, ?因此我們?換了解決方案 2。
第?種解決方案,我們的做法會涉及到?個假設(shè)前提,就是假設(shè)不存在數(shù)據(jù)源亂序的情況 。在這種情況 下, ?key 存的是“did + 維度”,Value 為“時間戳”, ?它的更新方式如上圖所示。
04?01 來了?條數(shù)據(jù), ?進行結(jié)果輸出 。04:02 來了?條數(shù)據(jù),如果是同?個 did,那么它會更新時間戳, ?然后仍然做結(jié)果輸出 。04:04 也是同樣的邏輯,然后將時間戳更新到 04:04,如果后面來了?條 04:
01 的數(shù)據(jù), ?它發(fā)現(xiàn)時間戳已經(jīng)更新到 04?04, ?它會丟棄這條數(shù)據(jù)。
這樣的做法大幅度減少了本身所需要的?些狀態(tài),但是對亂序是零容忍,不允許發(fā)生任何亂序的情況, ?由 于我們不好解決這個問題, ?因此我們?想出了解決方案 3。
方案 3 是在方案 2 時間戳的基礎(chǔ)之上, ?加了?個類似于環(huán)形緩沖區(qū),在緩沖區(qū)之內(nèi)允許亂序。
比如 04?01 來了?條數(shù)據(jù), ?進行結(jié)果輸出;04?02 來了?條數(shù)據(jù), ?它會把時間戳更新到 04?02,并且會記 錄同?個設(shè)備在 04?01 也來過 。如果 04?04 再來了?條數(shù)據(jù),就按照相應(yīng)的時間差做?個位移, ?最后通 ?過這樣的邏輯去保障它能夠容忍?定的亂序。
綜合來看這三個方案:
方案 1 在容忍 16 分鐘亂序的情況下, ?單作業(yè)的狀態(tài)大?在 480G 左右 。這種情況雖然保證了準(zhǔn)確性,但 是作業(yè)的恢復(fù)和穩(wěn)定性是完全不可控的狀態(tài), ?因此我們還是放棄了這個方案;
方案 2 是 30G 左右的狀態(tài)大? ,對于亂序 0 容忍,但是數(shù)據(jù)不準(zhǔn)確, ?由于我們對準(zhǔn)確性的要求非常高, 因此也放棄了這個方案;
方案 3 的狀態(tài)跟方案 1 相比, ?它的狀態(tài)雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效
果 。方案 3 容忍亂序的時間是 16 分鐘,我們正常更新?個作業(yè)的話, ?10 分鐘完全足夠重啟, ?因此最終選 擇了方案 3。
3. 運營場景
3.1 背景介紹

運營場景可分為四個部分:
第?個是數(shù)據(jù)大屏支持, ?包括單直播間的分析數(shù)據(jù)和大盤的分析數(shù)據(jù), ?需要做到分鐘級延遲,更新要求比
較高;
第?個是直播看板支持, ?直播看板的數(shù)據(jù)會有特定維度的分析,特定?群支持,對維度豐富性要求比較
高;
第三個是數(shù)據(jù)策略榜單, ?這個榜單主要是預(yù)測熱門作品 、爆款,要求的是?時級別的數(shù)據(jù),更新要求比較
低;
第四個是 C 端實時指標(biāo)展示,查詢量比較大,但是查詢模式比較固定。
下面進行分析這 4 種不同的狀態(tài)產(chǎn)生的?些不同的場景。

前 3 種基本沒有什么差別, ?只是在查詢模式上, ?有的是特定業(yè)務(wù)場景,有的是通用業(yè)務(wù)場景。
針對第 3 種和第 4 種, ?它對于更新的要求比較低,對于吞吐的要求比較高,過程之中的曲線也不要求有?致 性 。第 4 種查詢模式更多的是單實體的?些查詢, ?比如去查詢內(nèi)容,會有哪些指標(biāo),而且對 QPS 要求比較 ?高。
3.2 技術(shù)方案
針對上方 4 種不同的場景,我們是如何去做的?

首先看?下基礎(chǔ)明細(xì)層 (圖中左方),數(shù)據(jù)源有兩條鏈路,其中?條鏈路是消費的流, ?比如直播的消費信 息, ?還有觀看 / 點贊 / 評論 。經(jīng)過?輪基礎(chǔ)清洗,然后做維度管理 。上游的這些維度信息來源于 Kafka,
Kafka 寫?了?些內(nèi)容的維度,放到了 KV 存儲里邊, ?包括?些用戶的維度。
這些維度關(guān)聯(lián)了之后, ?最終寫? Kafka 的 DWD 事實層, ?這里為了做性能的提升,我們做了?級緩存的操 作。
如圖中上? ,我們讀取 DWD 層的數(shù)據(jù)然后做基礎(chǔ)匯總,核?是窗?維度聚合生成 4 種不同粒度的數(shù)據(jù),
分別是大盤多維匯總 topic 、直播間多維匯總 topic 、作者多維匯總 topic 、用戶多維匯總 topic, ?這些都是 通用維度的數(shù)據(jù)。
如圖中下? ,基于這些通用維度數(shù)據(jù),我們再去加?個性化維度的數(shù)據(jù),也就是 ADS 層 。拿到了這些數(shù)
據(jù)之后會有維度擴展, ?包括內(nèi)容擴展和運營維度的拓展,然后再去做聚合, ?比如會有電商實時 topic,機 構(gòu)服務(wù)實時 topic 和大 V 直播實時 topic。
分成這樣的兩個鏈路會有?個好處:?個地?處理的是通用維度, ?另?個地?處理的是個性化的維度 。通 用維度保障的要求會比較高?些,個性化維度則會做很多個性化的邏輯 。如果這兩個耦合在?起的話,會 發(fā)現(xiàn)任務(wù)經(jīng)常出問題,并且分不清楚哪個任務(wù)的職責(zé)是什么,構(gòu)建不出這樣的?個穩(wěn)定層。
如圖中右?, ?最終我們用到了三種不同的引擎 。簡單來說就是 Redis 查詢用到了 C 端的場景, ?OLAP 查詢 用到了大屏 、業(yè)務(wù)看板的場景。
5) 未來規(guī)劃
上??共講了三個場景,第?個場景是標(biāo)準(zhǔn)化 PU/UV 的計算,第?個場景是 DAU 整體的解決?案,第三個場 景是運營側(cè)如何解決 。基于這些內(nèi)容,我們有?些未來規(guī)劃,分為 4 個部分。

第?部分是實時保障體系完善:
??面做?些大型的活動, ?包括春晚活動以及后續(xù)常態(tài)化的活動 。針對這些活動如何去保障,我們有
?套規(guī)范去做平臺化的建設(shè);
第?個是分級保障標(biāo)準(zhǔn)制定, ?哪些作業(yè)是什么樣的保障級別 / 標(biāo)準(zhǔn),會有?個標(biāo)準(zhǔn)化的說明;
第三個是引擎平臺能?推動解決, ?包括 Flink 任務(wù)的?些引擎,在這上面我們會有?個平臺,基于這
個平臺去做規(guī)范 、標(biāo)準(zhǔn)化的推動。
第?部分是實時數(shù)倉內(nèi)容構(gòu)建:
??面是場景化?案的輸出, ?比如針對活動會有?些通用化的?案,而不是每次活動都開發(fā)?套新的
解決?案;
另??面是內(nèi)容數(shù)據(jù)層次沉淀, ?比如現(xiàn)在的數(shù)據(jù)內(nèi)容建設(shè),在厚度?面有?些場景的缺失, ?包括內(nèi)容
如何更好地服務(wù)于上游的場景。
第三部分是 Flink SQL 場景化構(gòu)建, ?包括 SQL 持續(xù)推? 、SQL 任務(wù)穩(wěn)定性和 SQL 任務(wù)資源利用率 。我們 在預(yù)估資源的過程中,會考慮比如在什么樣 QPS 的場景下, ? SQL 用什么樣的解決?案,能?撐到什么情
況 。Flink SQL 可以?幅減少?效,但是在這個過程中,我們想讓業(yè)務(wù)操作更加簡單。
第四部分是批流?體探索 。實時數(shù)倉的場景其實就是做離線 ETL 計算加速,我們會有很多?時級別的任
務(wù),針對這些任務(wù),每次批處理的時候有?些邏輯可以放到流處理去解決, ?這對于離線數(shù)倉 SLA 體系的提 升?分巨? 。
3. 騰訊看點實時數(shù)倉案例
騰訊看點業(yè)務(wù)為什么要構(gòu)建實時數(shù)倉??因為原始的上報數(shù)據(jù)量非常?,? 天上報峰值就有上萬億條 。而且上報 格式混亂 。缺乏內(nèi)容維度信息 、用戶畫像信息,下游沒辦法直接使用。
而我們提供的實時數(shù)倉, ?是根據(jù)騰訊看點信息流的業(yè)務(wù)場景, ?進行了內(nèi)容維度的關(guān)聯(lián),用戶畫像的關(guān)聯(lián),各種 粒度的聚合,下游可以非常?便的使用實時數(shù)據(jù),而且實時數(shù)據(jù)倉庫可以提供給下游的用戶反復(fù)的消費使用, 可以?量的減少重復(fù)的?作。
1) 方案選型

那就看下我們多維實時數(shù)據(jù)分析系統(tǒng)的?案選型,選型我們對比了行業(yè)內(nèi)的領(lǐng)先?案,選擇了最符合我們業(yè)務(wù) 場景的?案。
第?塊是實時數(shù)倉的選型,我們選擇的是業(yè)界比較成熟的 Lambda 架構(gòu),他的優(yōu)點是靈活性高 、容錯性
高 、成熟度高和遷移成本低;缺點是實時 、離線數(shù)據(jù)用兩套代碼,可能會存在?個?徑修改了, ?另?個沒 改的問題,我們每天都有做數(shù)據(jù)對賬的?作,如果有異常會進行告警。
第?塊是實時計算引擎選型, ?因為 Flink 設(shè)計之初就是為了流處理,SparkStreaming 嚴(yán)格來說還是微批處 理,Strom 用的已經(jīng)不多了 。再看 Flink 具有 Exactly-once 的準(zhǔn)確性 、輕量級 Checkpoint 容錯機制 、低
延時高吞吐和易用性高的特點,我們選擇了 Flink 作為實時計算引擎。
第三塊是實時存儲引擎,我們的要求就是需要有維度索引 、支持高并發(fā) 、預(yù)聚合 、高性能實時多維 OLAP
查詢 。可以看到, ?Hbase 、Tdsql 和 ES 都不能滿足要求, ?Druid 有?個缺陷, ?它是按照時序劃分 ? ? ? ? ? ?Segment,無法將同?個內(nèi)容,存放在同?個 Segment 上, ?計算全局 TopN 只能是近似值,所以我們選 擇了最近兩年大火的 MPP 數(shù)據(jù)庫引擎 ClickHouse。
2) 設(shè)計目標(biāo)與設(shè)計難點

我們多維實時數(shù)據(jù)分析系統(tǒng)分為三大模塊
1. 實時計算引擎
2. 實時存儲引擎
3. App 層
難點主要在前兩個模塊:實時計算引擎和實時存儲引擎。
1. 千萬級/s 的海量數(shù)據(jù)如何實時接? ,并且進行極低延遲維表關(guān)聯(lián)。
2. 實時存儲引擎如何支持高并發(fā)寫? 、高可用分布式和高性能索引查詢, ?是比較難的。這?個模塊的具體實現(xiàn),看?下我們系統(tǒng)的架構(gòu)設(shè)計。
3) 架構(gòu)設(shè)計

前端采用的是開源組件 Ant Design,利用了 Nginx 服務(wù)器,部署靜態(tài)頁面,并反向代理了瀏覽器的請求到后 臺服務(wù)器上。
后臺服務(wù)是基于騰訊自研的 RPC 后臺服務(wù)框架寫的,并且會進??些?級緩存。
實時數(shù)倉部分,分為了接?層 、實時計算層和實時數(shù)倉存儲層。
接?層主要是從千萬級/s 的原始消息隊列中,拆分出不同?為數(shù)據(jù)的微隊列,拿看點的視頻來說,拆分過 后,數(shù)據(jù)就只有百萬級/s 了;
實時計算層主要負(fù)責(zé), ?多??為流水?dāng)?shù)據(jù)進??轉(zhuǎn)列, ?實時關(guān)聯(lián)用戶畫像數(shù)據(jù)和內(nèi)容維度數(shù)據(jù);
實時數(shù)倉存儲層主要是設(shè)計出符合看點業(yè)務(wù)的,下游好用的實時消息隊列 。我們暫時提供了兩個消息隊
列,作為實時數(shù)倉的兩層 。?層 DWM 層是內(nèi)容 ID-用戶 ID 粒度聚合的,就是?條數(shù)據(jù)包含內(nèi)容 ID-用戶 ID 還有 B 側(cè)內(nèi)容數(shù)據(jù) 、C 側(cè)用戶數(shù)據(jù)和用戶畫像數(shù)據(jù);另?層是 DWS 層, ?是內(nèi)容 ID 粒度聚合的,? 條 數(shù)據(jù)包含內(nèi)容 ID, ?B 側(cè)數(shù)據(jù)和 C 側(cè)數(shù)據(jù) 。可以看到內(nèi)容 ID-用戶 ID 粒度的消息隊列流量進?步減小到? 萬級/s, ?內(nèi)容 ID 粒度的更是萬級/s,并且格式更加清晰,維度信息更加豐富。
實時存儲部分分為實時寫?層 、OLAP 存儲層和后臺接?層。
實時寫?層主要是負(fù)責(zé) Hash 路由將數(shù)據(jù)寫?;
OLAP 存儲層利用 MPP 存儲引擎,設(shè)計符合業(yè)務(wù)的索引和物化視圖, ?高效存儲海量數(shù)據(jù);后臺接?層提供高效的多維實時查詢接? 。
4) 實時計算
這個系統(tǒng)最復(fù)雜的兩塊, ?實時計算和實時存儲。
先介紹實時計算部分:分為實時關(guān)聯(lián)和實時數(shù)倉。
1. 實時高性能維表關(guān)聯(lián)

實時維表關(guān)聯(lián)這?塊難度在于 百萬級/s 的實時數(shù)據(jù)流,如果直接去關(guān)聯(lián) HBase, ?1 分鐘的數(shù)據(jù),關(guān)聯(lián)完 HBase 耗時是?時級的,會導(dǎo)致數(shù)據(jù)延遲嚴(yán)重。
我們提出了?個解決方案:
第?個是,在 Flink 實時計算環(huán)節(jié),先按照 1 分鐘進行了窗?聚合,將窗?內(nèi)多行行為數(shù)據(jù)轉(zhuǎn)?行多列的 數(shù)據(jù)格式,經(jīng)過這?步操作,原本?時級的關(guān)聯(lián)耗時下降到了??分鐘,但是還是不夠的。
第二個是,在訪問 HBase 內(nèi)容之前設(shè)置?層 Redis 緩存, ?因為 1000 條數(shù)據(jù)訪問 HBase 是秒級的,而訪
問 Redis 是毫秒級的,訪問 Redis 的速度基本是訪問 HBase 的 1000 倍 。為了防止過期的數(shù)據(jù)浪費緩 ? ? 存,緩存過期時間設(shè)置成 24 ?時, ?同時通過監(jiān)聽寫 HBase Proxy 來保證緩存的?致性 。這樣將訪問時間
從??分鐘變成了秒級。
第三個是,上報過程中會上報不少非常規(guī)內(nèi)容 ID, ?這些內(nèi)容 ID 在內(nèi)容 HBase 中是不存儲的,會造成緩存
穿透的問題 。所以在實時計算的時候,我們直接過濾掉這些內(nèi)容 ID, ?防止緩存穿透, ??減少?些時間 。? 第四個是, ?因為設(shè)置了定時緩存,會引??個緩存雪崩的問題 。為了防止雪崩,我們在實時計算中, ?進行
了削峰填谷的操作,錯開設(shè)置緩存的時間。
可以看到,優(yōu)化前后,數(shù)據(jù)量從百億級減少到了?億級,耗時從?t時級減少到了數(shù)?秒,減少 99%。
2. 下游提供服務(wù)

實時數(shù)倉的難度在于:它處于比較新的領(lǐng)域,并且各個公司各個業(yè)務(wù)差距比較大,怎么能設(shè)計出?便,好用, 符合看點業(yè)務(wù)場景的實時數(shù)倉是有難度的。
先看?下實時數(shù)倉做了什么, ?實時數(shù)倉對外就是?個消息隊列,不同的消息隊列里面存放的就是不同聚合粒度 的實時數(shù)據(jù), ?包括內(nèi)容 ID 、用戶 ID 、C 側(cè)行為數(shù)據(jù) 、B 側(cè)內(nèi)容維度數(shù)據(jù)和用戶畫像數(shù)據(jù)等。
我們是怎么搭建實時數(shù)倉的,就是上面介紹的實時計算引擎的輸出,放到消息隊列中保存,可以提供給下游多 用戶復(fù)用。
我們可以看下, ?在我們建設(shè)實時數(shù)據(jù)倉庫前后, ?開發(fā)?個實時應(yīng)用的區(qū)別 。沒有數(shù)倉的時候,我們需要消費千 萬級/s 的原始隊列, ?進行復(fù)雜的數(shù)據(jù)清洗,然后再進行用戶畫像關(guān)聯(lián) 、內(nèi)容維度關(guān)聯(lián),才能拿到符合要求格式 的實時數(shù)據(jù), ?開發(fā)和擴展的成本都會比較高,如果想開發(fā)?個新的應(yīng)用, ??要走?遍這個流程 。有了數(shù)倉之 ? 后,如果想開發(fā)內(nèi)容 ID 粒度的實時應(yīng)用,就直接申請 TPS 萬級/s 的 DWS 層的消息隊列 。開發(fā)成本變低很 ? ?多, ?資源消耗?很多,可擴展性也強很多。
看個實際例?, ?開發(fā)我們系統(tǒng)的實時數(shù)據(jù)大屏,原本需要進行如上所有操作,才能拿到數(shù)據(jù) 。現(xiàn)在只需要消費 DWS 層消息隊列, ?寫?條 Flink SQL 即可,僅消耗 2 個 CPU 核?, ?1G 內(nèi)存。
可以看到, ?以 50 個消費者為例, ?建立實時數(shù)倉前后,下游開發(fā)?個實時應(yīng)用,可以減少 98%的資源消耗 。包 括計算資源,存儲資源,??成本和開發(fā)?員學(xué)習(xí)接?成本等等 。并且消費者越多, ?節(jié)省越多 。就拿 Redis 存 儲這?部分來說,?個?就能省下上百萬?民幣。
5) 實時存儲
介紹完實時計算,再來介紹實時存儲。
這塊分為三個部分來介紹
第?是 分布式-高可用
第?是 海量數(shù)據(jù)-寫?
第三是 高性能-查詢

我們這里聽取的是 Clickhouse 官方的建議,借助 ZK 實現(xiàn)高可用的方案 。數(shù)據(jù)寫??個分片,僅寫??個副 本,然后再寫 ZK,通過 ZK 告訴同?個分片的其他副本,其他副本再過來拉取數(shù)據(jù),保證數(shù)據(jù)?致性。
這里沒有選用消息隊列進?數(shù)據(jù)同步, ?是因為 ZK 更加輕量級 。而且寫的時候,任意寫?個副本,其它副本都 能夠通過 ZK 獲得?致的數(shù)據(jù) 。而且就算其它節(jié)點第?次來獲取數(shù)據(jù)失敗了,后?只要發(fā)現(xiàn)它跟 ZK 上記錄的 數(shù)據(jù)不?致,就會再次嘗試獲取數(shù)據(jù),保證?致性。
2. 海量數(shù)據(jù)-寫入

數(shù)據(jù)寫?遇到的第?個問題是,海量數(shù)據(jù)直接寫? Clickhouse 的話,會導(dǎo)致 ZK 的 QPS 太高,解決方案是改 用 Batch 方式寫? 。Batch 設(shè)置多大呢, ?Batch 太?的話緩解不了 ZK 的壓? , ?Batch 也不能太大,不然上游 內(nèi)存壓?太大,通過實驗, ?最終我們選用了大???萬的 Batch。
第?個問題是, ?隨著數(shù)據(jù)量的增?, ?單 QQ 看點的視頻內(nèi)容每天可能寫?百億級的數(shù)據(jù),默認(rèn)方案是寫?張分 布式表, ?這就會造成單臺機器出現(xiàn)磁盤的瓶頸,尤其是 Clickhouse 底層運用的是 Mergetree,原理類似于 ? ? HBase 、RocketsDB 的底層 LSM-Tree。在合并的過程中會存在寫放大的問題,加重磁盤壓? 。峰值每分鐘? 千萬條數(shù)據(jù), ?寫完耗時??秒,如果正在做 Merge,就會阻塞寫?請求,查詢也會?常慢 。我們做的兩個優(yōu)化 方案:?是對磁盤做 Raid,提升磁盤的 IO;?是在寫?之前進?分表, ?直接分開寫?到不同的分片上, ?磁盤 ?壓?直接變?yōu)?1/N。
第三個問題是,雖然我們寫?按照分片進?了劃分,但是這里引?了?個分布式系統(tǒng)常?的問題,就是局部的 Top 并非全局 Top 的問題 。比如同?個內(nèi)容 ID 的數(shù)據(jù)落在了不同的分片上, ?計算全局 Top100 閱讀的內(nèi)容 ? ?ID,有?個內(nèi)容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導(dǎo)致匯總的時候,會丟失?部分 數(shù)據(jù),影響最終結(jié)果 。我們做的優(yōu)化是在寫?之前加上?層路由,將同?個內(nèi)容 ID 的記錄,全部路由到同? ?個分片上, ?解決了該問題。
介紹完寫?,下?步介紹 Clickhouse 的高性能存儲和查詢。
3. 高性能-存儲-查詢
Clickhouse 高性能查詢的?個關(guān)鍵點是稀疏索引 。稀疏索引這個設(shè)計就很有講究,設(shè)計得好可以加速查詢,設(shè) 計不好反而會影響查詢效率 。我根據(jù)我們的業(yè)務(wù)場景, ?因為我們的查詢大部分都是時間和內(nèi)容 ID 相關(guān)的, ?比 ? 如說,某個內(nèi)容,過去 N 分鐘在各個?群表現(xiàn)如何?我按照?期,分鐘粒度時間和內(nèi)容 ID 建立了稀疏索引 。?針對某個內(nèi)容的查詢, ?建立稀疏索引之后,可以減少 99%的?件掃描。
還有?個問題就是,我們現(xiàn)在數(shù)據(jù)量太大,維度太多 。拿 QQ 看點的視頻內(nèi)容來說,? 天流水有上百億條,有 些維度有?百個類別 。如果?次性把所有維度進?預(yù)聚合,數(shù)據(jù)量會指數(shù)膨脹,查詢反而變慢,并且會占用大 量內(nèi)存空間 。我們的優(yōu)化,針對不同的維度, ?建立對應(yīng)的預(yù)聚合物化視圖,用空間換時間, ?這樣可以縮短查詢 的時間。
騰訊看點高性能存儲

分布式表查詢還會有?個問題,查詢單個內(nèi)容 ID 的信息,分布式表會將查詢下發(fā)到所有的分片上, ?然后再返 ?回查詢結(jié)果進?匯總 。實際上, ?因為做過路由,?個內(nèi)容 ID 只存在于?個分片上, ?剩下的分片都在空跑 。針 ?對這類查詢,我們的優(yōu)化是后臺按照同樣的規(guī)則先進?路由, ?直接查詢目標(biāo)分片, ?這樣減少了 N-1/N 的負(fù)載, 可以大量縮短查詢時間 。而且由于我們是提供的 OLAP 查詢,數(shù)據(jù)滿足最終?致性即可,通過主從副本讀寫分 離,可以進?步提升性能。
我們在后臺還做了?個 1 分鐘的數(shù)據(jù)緩存,針對相同條件查詢,后臺就直接返回了。
4. 擴容
這里再介紹?下我們的擴容的?案,調(diào)研了業(yè)內(nèi)的?些常??案。
比如 HBase,原始數(shù)據(jù)都存放在 HDFS 上, ?擴容只是 Region Server 擴容,不涉及原始數(shù)據(jù)的遷移 。但是 Clickhouse 的每個分片數(shù)據(jù)都是在本地, ?是?個比較底層存儲引擎,不能像 HBase 那樣?便擴容。
Redis 是哈希槽這種類似?致性哈希的方式, ?是比較經(jīng)典分布式緩存的方案 。Redis slot 在 Rehash 的過程中 雖然存在短暫的 ask 讀不可用,但是總體來說遷移是比較方便的,從原 h[0]遷移到 h[1], ?最后再刪除 h[0]。但是 Clickhouse ?部分都是 OLAP 批量查詢,不是點查,而且由于列式存儲,不支持刪除的特性,? 致性哈 希的方案不是很適合。
目前擴容的方案是, ?另外消費?份數(shù)據(jù), ?寫?新 Clickhouse 集群,兩個集群?起跑?段時間, ?因為實時數(shù)據(jù) 就保存 3 天,等 3 天之后,后臺服務(wù)直接訪問新集群。
4. 有贊實時數(shù)倉案例
1) 分層設(shè)計
傳統(tǒng)離線數(shù)倉的分層設(shè)計?家都很熟悉,為了規(guī)范的組織和管理數(shù)據(jù),層級劃分會比較多,在?些復(fù)雜邏輯處 理場景還會引?臨時層落地中間結(jié)果以方便下游加?處理 。實時數(shù)倉考慮到時效性問題,分層設(shè)計需要盡量精
簡, ?降低中間流程出錯的可能性,不過總體而?, ?實時數(shù)倉還是會參考離線數(shù)倉的分層思想來設(shè)計。
實時數(shù)倉分層架構(gòu)如下圖所示 :

ODS ?( 實時數(shù)據(jù)接入層)
ODS 層, ?即實時數(shù)據(jù)接?層,通過數(shù)據(jù)采集?具收集各個業(yè)務(wù)系統(tǒng)的實時數(shù)據(jù),對非結(jié)構(gòu)化的數(shù)據(jù)進?結(jié)構(gòu)化 處理,保存原始數(shù)據(jù),?乎不過濾數(shù)據(jù);該層數(shù)據(jù)的主要來源有三個部分:第?部分是業(yè)務(wù)方創(chuàng)建的 NSQ 消 ?息,第?部分是業(yè)務(wù)數(shù)據(jù)庫的 Binlog 日志,第三部分是埋點日志和應(yīng)用程序日志, ?以上三部分的實時數(shù)據(jù)最終 統(tǒng)?寫? Kafka 存儲介質(zhì)中。
ODS 層表命名規(guī)范:部門名稱.應(yīng)用名稱.數(shù)倉層級主題域前綴數(shù)據(jù)庫名/消息名
例如:接?業(yè)務(wù)庫的 Binlog
實時數(shù)倉表命名:? deptname.appname.ods_subjectname_tablename
例如:接?業(yè)務(wù)方的 NSQ 消息
實時數(shù)倉表命名:? deptname.appname.ods_subjectname_msgname
DWS ?( 實時明細(xì)中間層)
DWS 層, ?即實時明細(xì)中間層,該層以業(yè)務(wù)過程作為建模驅(qū)動,基于每個具體的業(yè)務(wù)過程事件來構(gòu)建最細(xì)粒度 的明細(xì)層事實表;?比如交易過程,有下單事件 、支付事件 、發(fā)貨事件等,我們會基于這些獨立的事件來進行明 細(xì)層的構(gòu)建 。在這層,事實明細(xì)數(shù)據(jù)同樣是按照離線數(shù)倉的主題域來進行劃分,也會采用維度建模的方式組織 數(shù)據(jù),對于?些重要的維度字段,會做適當(dāng)冗余 。基于有贊實時需求的場景, ?重點建設(shè)交易 、營銷 、客戶 、店 鋪 、商品等主題域的數(shù)據(jù) 。該層的數(shù)據(jù)來源于 ODS 層,通過 FlinkSQL 進行 ETL 處理, ?主要?作有規(guī)范命 ? ? 名 、數(shù)據(jù)清洗 、維度補全 、多流關(guān)聯(lián), ?最終統(tǒng)?寫? Kafka 存儲介質(zhì)中。
DWS 層表命名規(guī)范:? 部門名稱 .應(yīng)用名稱 .數(shù)倉層級_主題域前綴_數(shù)倉表命名
例如:實時事件 A 的中間層
實時數(shù)倉表命名:? deptname.appname.dws_subjectname_tablename_eventnameA
例如:實時事件 B 的中間層
實時數(shù)倉表命名:? deptname.appname.dws_subjectname_tablename_eventnameB
DIM ?( 實時維表層)
DIM 層, ?即實時維表層,用來存放維度數(shù)據(jù), ?主要用于實時明細(xì)中間層寬化處理時補全維度使用, ? 目前該層的 數(shù)據(jù)主要存儲于 HBase 中,后續(xù)會基于 QPS 和數(shù)據(jù)量大?提供更多合適類型的存儲介質(zhì)。
DIM 層表命名規(guī)范:? 應(yīng)用名稱_數(shù)倉層級_主題域前綴_數(shù)倉表命名
例如:HBase 存儲, ?實時維度表
實時數(shù)倉表命名:? appname_dim_tablename
DWA ?( 實時匯總層)
DWA 層, ?即實時匯總層,該層通過 DWS 層數(shù)據(jù)進行多維匯總,提供給下游業(yè)務(wù)方使用,在實際應(yīng)用過程中, 不同業(yè)務(wù)方使用維度匯總的方式不太?樣,根據(jù)不同的需求采用不同的技術(shù)方案去實現(xiàn) 。第?種方式,采用 ? ?FlinkSQL 進行實時匯總,將結(jié)果指標(biāo)存? HBase 、MySQL 等數(shù)據(jù)庫,該種方式是我們早期采用的方案,優(yōu)點 是實現(xiàn)業(yè)務(wù)邏輯比較靈活,缺點是聚合粒度固化,不易擴展;第?種方式,采用實時 OLAP ?具進行匯總,該 種方式是我們目前常用的方案,優(yōu)點是聚合粒度易擴展,缺點是業(yè)務(wù)邏輯需要在中間層預(yù)處理。
DWA 層表命名規(guī)范:? 應(yīng)用名稱_數(shù)倉層級_主題域前綴_聚合粒度_數(shù)據(jù)范圍
例如:HBase 存儲,某域當(dāng)日某粒度實時匯總表
實時數(shù)倉表命名:? appname_dwa_subjectname_aggname_daily
APP ?( 實時應(yīng)用層)
APP 層, ?即實時應(yīng)用層,該層數(shù)據(jù)已經(jīng)寫入應(yīng)用系統(tǒng)的存儲中,例如寫入 Druid 作為 BI 看板的實時數(shù)據(jù)集;寫入 HBase 、MySQL 用于提供統(tǒng)?數(shù)據(jù)服務(wù)接口;寫入 ClickHouse 用于提供實時 OLAP 服務(wù) 。因為該層非 常貼近業(yè)務(wù),在命名規(guī)范上實時數(shù)倉不做統(tǒng)?要求。
2) 實時 ETr
實時數(shù)倉 ETL 處理過程所涉及的組件比較多,接下來盤點構(gòu)建實時數(shù)倉所需要的組件以及每個組件的應(yīng)用場 景 。如下圖所示:

具體實時 ETL 處理流程如下圖所示:

1. 維度補全
創(chuàng)建調(diào)用 Duboo 接口的 UDF 函數(shù)在實時流里補全維度是最便捷的使用方式,但如果請求量過大,對 Duboo ?接口壓力會過大 。在實際應(yīng)用場景補全維度首選還是關(guān)聯(lián)維度表,但關(guān)聯(lián)也存在?定概率的丟失問題,為了彌 補這種丟失,可以采用 Duboo 接口調(diào)用兜底的方式來補全 。偽代碼如下:
create function call_dubbo as 'XXXXXXX';
create function get_json_object as 'XXXXXXX';
case
when cast( b.column as bigint) is not null
then cast( b.column as bigint)
else cast(coalesce(cast(get_json_object(call_dubbo( 'clusterUrl '
, 'serviceName '
, 'methodName '
,cast(concat( ' [ ',cast(a.column as varchar), '] ') as varchar)
, 'key '
)
, 'rootId ')
as bigint)
,a.column)
as bigint) ?end
2. 幕等處理
實時任務(wù)在運?過程中難免會遇到執(zhí)?異常的情況, ?當(dāng)任務(wù)異常重啟的時候會導(dǎo)致部分消息重新發(fā)送和消費, 從而引發(fā)下游實時統(tǒng)計數(shù)據(jù)不準(zhǔn)確,為了有效避免這種情況,可以選擇對實時消息流做冪等處理, ?當(dāng)消費完? 條消息,將這條消息的 Key 存? KV,如果任務(wù)異常重啟導(dǎo)致消息重新發(fā)送的時候,先從 KV 判斷該消息是否 已被消費,如果已消費就不再往下發(fā)送 。偽代碼如下:
create function idempotenc as 'XXXXXXX';
insert into table
select
order_no
from
(
select
a.orderNo ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?as ?order_no
, idempotenc( 'XXXXXXX', coalesce( order_no, ' ') ) ?as ?rid
from
table1
) t
where
t.rid = 0;
3. 數(shù)據(jù)驗證
由于實時數(shù)倉的數(shù)據(jù)是?邊界的流,相比于離線數(shù)倉固定不變的數(shù)據(jù)更難驗收 。基于不同的場景,我們提供了 2 種驗證?式,分別是:抽樣驗證與全量驗證 。如下圖所示
抽樣驗證?案
該?案主要應(yīng)用在數(shù)據(jù)準(zhǔn)確性驗證上, ?實時匯總結(jié)果是基于存儲在 Kafka 的實時明細(xì)中間層計算而來,但 ? ? ? Kafka 本身不?持按照特定條件檢索,不?持寫查詢語句,再加上消息的?邊界性,統(tǒng)計結(jié)果是在不斷變化 ? ?的,很難尋找參照物進?比對 。鑒于此,我們采用了持久化消息的?法,將消息落盤到 TiDB 存儲,基于 TiDB 的能?對落盤的消息進?檢索 、查詢 、匯總 。編寫固定時間邊界的測試用例與相同時間邊界的業(yè)務(wù)庫數(shù)據(jù)或者 離線數(shù)倉數(shù)據(jù)進?比對 。通過以上?式,抽樣核?店鋪的數(shù)據(jù)進?指標(biāo)準(zhǔn)確性驗證,確保測試用例全部通過。
全量驗證?案
該?案主要應(yīng)用在數(shù)據(jù)完整性和?致性驗證上, ?在實時維度表驗證的場景使用最多 。大體思路:將存儲實時維 度表的在線 HBase 集群中的數(shù)據(jù)同步到離線 HBase 集群中,再將離線 HBase 集群中的數(shù)據(jù)導(dǎo)?到 Hive 中, 在限定實時維度表的時間邊界后,通過數(shù)據(jù)平臺提供的數(shù)據(jù)校驗功能, ?比對實時維度表與離線維度表是否存在 差異, ?最終確保兩張表的數(shù)據(jù)完全?致。

4. 數(shù)據(jù)恢復(fù)
實時任務(wù)?旦上線就要求持續(xù)不斷的提供準(zhǔn)確 、穩(wěn)定的服務(wù) 。區(qū)別于離線任務(wù)按天調(diào)度,如果離線任務(wù)出現(xiàn) ? Bug,會有充足的時間去修復(fù) 。如果實時任務(wù)出現(xiàn) Bug,必須按照提前制定好的流程,嚴(yán)格按照步驟執(zhí)行,否 則極易出現(xiàn)問題 。造成 Bug 的情況有非常多, ?比如代碼 Bug 、異常數(shù)據(jù) Bug 、實時集群 Bug,如下圖展示了 修復(fù)實時任務(wù) Bug 并恢復(fù)數(shù)據(jù)的流程。

5. 騰訊全場景實時數(shù)倉建設(shè)案例
在數(shù)倉體系中會有各種各樣的大數(shù)據(jù)組件,譬如 Hive/HBase/HDFS/S3,計算引擎如 MapReduce 、Spark 、 ?Flink,根據(jù)不同的需求,用戶會構(gòu)建大數(shù)據(jù)存儲和處理平臺,數(shù)據(jù)在平臺經(jīng)過處理和分析,結(jié)果數(shù)據(jù)會保存到 MySQL 、Elasticsearch 等支持快速查詢的關(guān)系型 、非關(guān)系型數(shù)據(jù)庫中,接下來應(yīng)用層就可以基于這些數(shù)據(jù)進 行 BI 報表開發(fā) 、用戶畫像,或基于 Presto 這種 OLAP 工具進行交互式查詢等。

1) Lambda 架構(gòu)的痛點
在整個過程中我們常常會用?些離線的調(diào)度系統(tǒng),定期的 ?( T+1 或者每隔??時) ?去執(zhí)行?些 Spark 分析任 ? ?務(wù),做?些數(shù)據(jù)的輸? 、輸出或是 ETL ?作 。離線數(shù)據(jù)處理的整個過程中必然存在數(shù)據(jù)延遲的現(xiàn)象,不管是數(shù) 據(jù)接?還是中間的分析,數(shù)據(jù)的延遲都是比較大的,可能是?時級也有可能是天級別的 。另外?些場景中我們 也常常會為了?些實時性的需求去構(gòu)建?個實時處理過程, ?比如借助 Flink+Kafka 去構(gòu)建實時的流處理系統(tǒng)。
整體上, ?數(shù)倉架構(gòu)中有非常多的組件,大大增加了整個架構(gòu)的復(fù)雜性和運維的成本。
如下圖, ?這是很多公司之前或者現(xiàn)在正在采用的 Lambda 架構(gòu), ?Lambda 架構(gòu)將數(shù)倉分為離線層和實時層,相 應(yīng)的就有批處理和流處理兩個相互獨立的數(shù)據(jù)處理流程, ?同?份數(shù)據(jù)會被處理兩次以上, ?同?套業(yè)務(wù)邏輯代碼 需要適配性的開發(fā)兩次 。Lambda 架構(gòu)大家應(yīng)該已經(jīng)非常熟悉了,下面我就著重介紹?下我們采用 Lambda 架 構(gòu)在數(shù)倉建設(shè)過程中遇到的?些痛點問題。

例如在實時計算?些用戶相關(guān)指標(biāo)的實時場景下, ?我們想看到當(dāng)前 pv 、uv 時,我們會將這些數(shù)據(jù)放到實時層 去做?些計算, ?這些指標(biāo)的值就會實時呈現(xiàn)出來,但同時想了解用戶的?個增?趨勢, ?需要把過去?天的數(shù)據(jù) 計算出來 。這樣就需要通過批處理的調(diào)度任務(wù)來實現(xiàn), ?比如凌晨兩三點的時候在調(diào)度系統(tǒng)上起?個 Spark 調(diào)度 任務(wù)把當(dāng)天所有的數(shù)據(jù)重新跑?遍。
很顯然在這個過程中, ?由于兩個過程運?的時間是不?樣的,跑的數(shù)據(jù)卻相同, ?因此可能造成數(shù)據(jù)的不?致 。因為某?條或?條數(shù)據(jù)的更新, ?需要重新跑?遍整個離線分析的鏈路,數(shù)據(jù)更新成本很?, ?同時需要維護離線 和實時分析兩套計算平臺,整個上下兩層的開發(fā)流程和運維成本其實都是?常高的。
為了解決 Lambda 架構(gòu)帶來的各種問題,就誕生了 Kappa 架構(gòu), ?這個架構(gòu)?家應(yīng)該也?常的熟悉。
2) Kappa 架構(gòu)的痛點
我們來講?下 Kappa 架構(gòu),如下圖, ?它中間其實用的是消息隊列,通過用 Flink 將整個鏈路串聯(lián)起來 。Kappa 架構(gòu)解決了 Lambda 架構(gòu)中離線處理層和實時處理層之間由于引擎不?樣,導(dǎo)致的運維成本和開發(fā)成本高昂的 問題,但 Kappa 架構(gòu)也有其痛點。
首先,在構(gòu)建實時業(yè)務(wù)場景時,會用到 Kappa 去構(gòu)建?個近實時的場景,但如果想對數(shù)倉中間層例如
ODS 層做?些簡單的 OLAP 分析或者進?步的數(shù)據(jù)處理時,如將數(shù)據(jù)寫到 DWD 層的 Kafka,則需要另外 接? Flink。同時, ?當(dāng)需要從 DWD 層的 Kafka 把數(shù)據(jù)再導(dǎo)?到 Clickhouse, ?Elasticsearch, ?MySQL 或 ?者是 Hive 里?做進?步的分析時, ?顯然就增加了整個架構(gòu)的復(fù)雜性。
其次, ?Kappa 架構(gòu)是強烈依賴消息隊列的,我們知道消息隊列本身在整個鏈路上數(shù)據(jù)計算的準(zhǔn)確性是嚴(yán)格
依賴它上游數(shù)據(jù)的順序,消息隊列接的越多,發(fā)生亂序的可能性就越? 。ODS 層數(shù)據(jù)?般是絕對準(zhǔn)確的, 把 ODS 層的數(shù)據(jù)發(fā)送到下?個 kafka 的時候就有可能發(fā)生亂序, ?DWD 層再發(fā)到 DWS 的時候可能?亂序 了, ?這樣數(shù)據(jù)不?致性就會變得很嚴(yán)重。
第三, ?Kafka 由于它是?個順序存儲的系統(tǒng),順序存儲系統(tǒng)是沒有辦法直接在其上?利用 OLAP 分析的? 些優(yōu)化策略,例如謂詞下推這類的優(yōu)化策略,在順序存儲的 Kafka 上來實現(xiàn)是比較困難的事情。
那么有沒有這樣?個架構(gòu), ?既能夠滿足實時性的需求, ??能夠滿足離線計算的要求,而且還能夠減輕運維開發(fā) 的成本,解決通過消息隊列構(gòu)建 Kappa 架構(gòu)過程中遇到的?些痛點?答案是肯定的,后?的篇幅會詳細(xì)論 ? ? 述。

3) 痛點總結(jié)
傳統(tǒng) T+1 任務(wù)
海量的TB級 T+ 1 任務(wù)延遲導(dǎo)致下游數(shù)據(jù)產(chǎn)出時間不穩(wěn)定。
任務(wù)遇到故障重試恢復(fù)代價昂貴
數(shù)據(jù)架構(gòu)在處理去重和 exactly-once語義能??面比較吃?
架構(gòu)復(fù)雜,涉及多個系統(tǒng)協(xié)調(diào), ?靠調(diào)度系統(tǒng)來構(gòu)建任務(wù)依賴關(guān)系
Lambda 架構(gòu)痛點
同時維護實時平臺和離線平臺兩套引擎, ?運維成本高
實時離線兩個平臺需要維護兩套框架不同但業(yè)務(wù)邏輯相同代碼, ?開發(fā)成本高
數(shù)據(jù)有兩條不同鏈路,容易造成數(shù)據(jù)的不?致性
數(shù)據(jù)更新成本大, ?需要重跑鏈路
Kappa 架構(gòu)痛點
對消息隊列存儲要求高,消息隊列的回溯能?不及離線存儲
消息隊列本身對數(shù)據(jù)存儲有時效性,且當(dāng)前?法使用 OLAP 引擎直接分析消息隊列中的數(shù)據(jù) ?全鏈路依賴消息隊列的實時計算可能因為數(shù)據(jù)的時序性導(dǎo)致結(jié)果不正確
4)實時數(shù)倉建設(shè)需求
是否存在?種存儲技術(shù), ?既能夠?持?jǐn)?shù)據(jù)高效的回溯能? ,?持?jǐn)?shù)據(jù)的更新, ??能夠?qū)崿F(xiàn)數(shù)據(jù)的批流讀寫,并 且還能夠?qū)崿F(xiàn)分鐘級到秒級的數(shù)據(jù)接??
這也是實時數(shù)倉建設(shè)的迫切需求 。實際上是可以通過對 Kappa 架構(gòu)進行升級, ?以解決 Kappa 架構(gòu)中遇到的? 些問題,接下來主要分享當(dāng)前比較火的數(shù)據(jù)湖技術(shù)--Iceberg。

5) 數(shù)據(jù)湖 Apache Iceberg 的介紹
1.Iceberg 是什么
首先介紹?下什么是 Iceberg。官網(wǎng)描述如下:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.
Iceberg 的官方定義是?種表格式,可以簡單理解為是基于計算層 ?( Flink , Spark) ?和存儲層 ?( ORC, ? ? ? ? ?Parqurt,Avro) ?的?個中間層,用 Flink 或者 Spark 將數(shù)據(jù)寫入 Iceberg,然后再通過其他方式來讀取這個 表, ?比如 Spark, ?Flink, ?Presto 等。

Iceberg 是為分析海量數(shù)據(jù)準(zhǔn)備的,被定義為 table format,table format 介于計算層和存儲層之間。
tableformat 主要用于向下管理在存儲系統(tǒng)上的?件, ?向上為計算層提供?些接? 。存儲系統(tǒng)上的?件存儲都 會采用?定的組織形式,譬如讀?張 Hive 表的時候, ?HDFS ?件系統(tǒng)會帶?些 partition,數(shù)據(jù)存儲格式 、數(shù) ?據(jù)壓縮格式 、數(shù)據(jù)存儲 HDFS 目錄的信息等, ?這些信息都存在 Metastore 上, ?Metastore 就可以稱之為?種? 件組織格式。
?個優(yōu)秀的?件組織格式,如 Iceberg,可以更高效的?持上層的計算層訪問磁盤上的?件,做?些 list、 rename 或者查找等操作。
3.Iceberg 的能力總結(jié)
Iceberg 目前?持三種?件格式 parquet,Avro, ?ORC,?論是 HDFS 或者 S3 上的?件,可以看到有?存也 有列存,后面會詳細(xì)的去介紹其作用 。Iceberg 本身具備的能?總結(jié)如下, ?這些能?對于后面我們利用 ? ? ? ? ?Iceberg 來構(gòu)建實時數(shù)倉是非常重要的。

4.Iceberg 的文件組織格式介紹
下圖展示的是 Iceberg 的整個?件組織格式 。從上往下看:
首先最上層是 snapshot 模塊 。Iceberg 里面的 snapshot 是?個用戶可讀取的基本的數(shù)據(jù)單位,也就是說 用戶每次讀取?張表里面的所有數(shù)據(jù),都是?個snapshot 下的數(shù)據(jù)。
其次, ?manifest。?個 snapshot 下面會有多個 manifest,如圖 snapshot-0 有兩個 manifest,而 snapshot-1 有三個 manifest,每個 manifest 下面會管理?個至多個 DataFiles ?件。
第三, ?DataFiles。manifest ?件里面存放的就是數(shù)據(jù)的元信息,我們可以打開 manifest ?件,可以看到 里面其實是???的 datafiles ?件路徑。
從圖上看到,snapshot-1 包含了 snapshop-0 的數(shù)據(jù),而 snapshot-1 這個時刻寫?的數(shù)據(jù)只有 manifest2, 這個能?其實就為我們后面去做增量讀取提供了?個很好的?持。

5.Iceberg 讀寫過程介紹
Apache Iceberg 讀寫
首先,如果有?個 write 操作,在寫 snapsho-1 的時候,snapshot-1 是虛線框,也就是說此時還沒有發(fā)生 commit 操作 。這時候?qū)?snapshot-1 的讀其實是不可讀的, ?因為用戶的讀只能讀到已經(jīng) commit 之后的 ? ?snapshot。發(fā)生 commit 之后才可以讀 。同理,會有 snapshot-2,snapshot-3。
Iceberg 提供的?個重要能力,就是讀寫分離能力 。在對 snapshot-4 進行寫的時候,其實是完全不影響對 snapshot-2 和 snapshot-3 的讀 。Iceberg 的這個能力對于構(gòu)建實時數(shù)倉是非常重要的能力之? 。

同理,讀也是可以并發(fā)的,可以同時讀 s1 、s2 、s3 的快照數(shù)據(jù), ?這就提供了回溯讀到 snapshot-2 或者 ? ? ? ? snapshot-3 數(shù)據(jù)的能力 。Snapshot-4 寫完成之后,會發(fā)生?次 commit 操作, ?這個時候 snapshot-4 變成了
實心,此時就可以讀了 。另外,可以看到 current Snapshot 的指針移到 s4,也就是說默認(rèn)情況下, ?用戶對? 張表的讀操作,都是讀 current Snapshot 指針?biāo)赶虻?Snapshot,但不會影響前面的 snapshot 的讀操作。
Apache Iceberg 增量讀
接下來講?下 Iceberg 的增量讀 。首先我們知道 Iceberg 的讀操作只能基于已經(jīng)提交完成的 snapshot-1,此 ?時會有?個 snapshot-2,可以看到每個 snapshot 都包含前面 snapshot 的所有數(shù)據(jù),如果每次都讀全量的數(shù) 據(jù),整個鏈路上對計算引擎來說,讀取的代價非常高。
如果只希望讀到當(dāng)前時刻新增的數(shù)據(jù), ?這個時候其實就可以根據(jù) Iceberg 的 snapshot 的回溯機制,僅讀取 snapshot1 到 snapshot2 的增量數(shù)據(jù),也就是紫色這塊的數(shù)據(jù)可以讀的。

同理 s3 也是可以只讀黃色的這塊區(qū)域的數(shù)據(jù), ?同時也可以讀 s3 到 s1 這塊的增量數(shù)據(jù),基于 Flink source 的 streaming reader 功能在內(nèi)部我們已經(jīng)實現(xiàn)這種增量讀取的功能,并且已經(jīng)在線上運行了 。剛才講到了?個非 常重要的問題, ?既然 Iceberg 已經(jīng)有了讀寫分離,并發(fā)讀,增量讀的功能, ?Iceberg 要跟 Flink 實現(xiàn)對接,那 ?么就必須實現(xiàn) Iceberg 的 sink。
實時小文件問題
社區(qū)現(xiàn)在已經(jīng)重構(gòu)了 Flink 里面的 FlinkIcebergSink,提供了 global committee 的功能,我們的架構(gòu)其實跟社 區(qū)的架構(gòu)是保持?致的, ?曲線框中的這塊內(nèi)容是 FlinkIcebergSink。
在有多個 IcebergStreamWriter 和?個 IcebergFileCommitter 的情況下,上游的數(shù)據(jù)寫到 IcebergStreamWriter 的時候,每個 writer 里面做的事情都是去寫 datafiles ?件。

當(dāng)每個 writer 寫完自己當(dāng)前這?批 datafiles 小文件的時候,就會發(fā)送消息給 IcebergmileCommitter,告訴它 可以提交了 。而 IcebergmileCommitter 收到信息的時,就?次性將 datafiles 的文件提交, ?進行?次 commit 操作。
commit 操作本身只是對?些原始信息的修改, ?當(dāng)數(shù)據(jù)都已經(jīng)寫到磁盤了, ?只是讓其從不可見變成可見 。在這 個情況下, ?Iceberg 只需要用?個 commit 即可完成數(shù)據(jù)從不可見變成可見的過程。
實時小文件合并
mlink 實時作業(yè)?般會長期在集群中運行,為了要保證數(shù)據(jù)的時效性,? 般會把 Iceberg commit 操作的時間周 期設(shè)成 30 秒或者是?分鐘 。當(dāng) mlink 作業(yè)跑?天時,如果是?分鐘?次 commit,? 天需要 1440 個 ? ? ? ? ? ? ?commit,如果 mlink 作業(yè)跑?個月commit 操作會更多 。甚至 snapshot commit 的時間間隔越短,生成的 ? ? ?snapshot 的數(shù)量會越多 。當(dāng)流式作業(yè)運行后,就會生成大量的小文件。
這個問題如果不解決的話, ?Iceberg 在 mlink 處理引擎上的 sink 操作就不可用了 。我們在內(nèi)部實現(xiàn)了?個叫做 data compaction operator 的功能, ?這個 operator 是跟著 mlink sink ?起走的 。當(dāng) Iceberg 的 ? ? ? ? ? ? ? ? ? ? mlinkIcebergSink 每完成?次 commit 操作的時候, ?它都會向下游 mileScanTaskGen 發(fā)送消息,告訴 ? ? ? ? ? ?mileScanTaskGen 已經(jīng)完成了?次 commit。

FileScanTaskGen 里面會有相關(guān)的邏輯,能夠根據(jù)用戶的配置或者當(dāng)前磁盤的特性來進?文件合并任務(wù)的生成 操作 。FileScanTaskGen 發(fā)送到 DataFileRewitre 的內(nèi)容其實就是在 FileScanTaskGen 里面生成的需要合并的 文件的列表 。同理, ?因為合并文件是需要?定的耗時操作,所以需要將其進?異步的操作分發(fā)到不同的task ? rewrite operator 中。
上面講過的 Iceberg 是有 commit 操作,對于 rewrite 之后的文件需要有?個新的 snapshot 。這里對 Iceberg 來說,也是?個 commit 操作,所以采用?個單并發(fā)的像 commit 操作?樣的事件。
整條鏈路下來,小文件的合并目前采用的是 commit 操作,如果 commit 操作后面阻塞了,會影響前面的寫? 操作, ?這塊我們后面會持續(xù)優(yōu)化。
6) Flink+Iceberg 構(gòu)建實時數(shù)倉
1.近實時的數(shù)據(jù)接入
前面介紹了 Iceberg 既支持讀寫分離, ?又支持并發(fā)讀 、增量讀 、小文件合并, ?還可以支持秒級到分鐘級的延 遲,基于這些優(yōu)勢我們嘗試采用 Iceberg 這些功能來構(gòu)建基于 Flink 的實時全鏈路批流?體化的實時數(shù)倉架 構(gòu)。
如下圖所示, ?Iceberg 每次的 commit 操作,都是對數(shù)據(jù)的可?性的改變, ?比如說讓數(shù)據(jù)從不可?變成可?, 在這個過程中,就可以實現(xiàn)近實時的數(shù)據(jù)記錄。

2.實時數(shù)倉 - 數(shù)據(jù)湖分析系統(tǒng)
此前需要先進行數(shù)據(jù)接?, ?比如用 Spark 的離線調(diào)度任務(wù)去跑?些數(shù)據(jù),拉取,抽取最后再寫?到 Hive 表里 面, ?這個過程的延時比較大 。有了 Iceberg 的表結(jié)構(gòu),可以中間使用 Flink,或者 spark streaming,完成近實 時的數(shù)據(jù)接? 。
基于以上功能,我們再來回顧?下前面討論的 Kappa 架構(gòu), ?Kappa 架構(gòu)的痛點上面已經(jīng)描述過, ?Iceberg 既然 能夠作為?個優(yōu)秀的表格式, ?既支持 Streaming reader, ?又可以支持 Streaming sink, ?是否可以考慮將 Kafka 替換成 Iceberg?
Iceberg 底層依賴的存儲是像 HDFS 或 S3 這樣的廉價存儲,而且 Iceberg 是支持 parquet 、orc 、Avro 這樣的 列式存儲 。有列式存儲的支持,就可以對 OLAP 分析進行基本的優(yōu)化,在中間層直接進行計算 。例如謂詞下推 最基本的 OLAP 優(yōu)化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務(wù)天級別到小時 級別的延遲大大的降低, ?改造成?個近實時的數(shù)據(jù)湖分析系統(tǒng)。

在中間處理層,可以用 presto 進行?些簡單的查詢, ?因為 Iceberg 支持 Streaming read,所以在系統(tǒng)的中間 層也可以直接接? mlink, ?直接在中間層用 mlink 做?些批處理或者流式計算的任務(wù),把中間結(jié)果做進?步計算 后輸出到下游。
替換 Kafka 的優(yōu)劣勢:
總的來說,Iceberg 替換 Kafka 的優(yōu)勢主要包括:
實現(xiàn)存儲層的流批統(tǒng)?
中間層支持 OLAP 分析
完美支持高效回溯
存儲成本降低
當(dāng)然,也存在?定的缺陷,如:
數(shù)據(jù)延遲從實時變成近實時
對接其他數(shù)據(jù)系統(tǒng)需要額外開發(fā)工作
秒級分析 - 數(shù)據(jù)湖加速:
由于 Iceberg 本身是將數(shù)據(jù)文件全部存儲在 工DmS 上的, ?工DmS 讀寫這塊對于秒級分析的場景, ?還是不能夠完 全滿足我們的需求,所以接下去我們會在 Iceberg 底層支持 Alluxio 這樣?個緩存,借助于緩存的能?可以實 現(xiàn)數(shù)據(jù)湖的加速 。這塊的架構(gòu)也在我們未來的?個規(guī)劃和建設(shè)中。

(部分內(nèi)容來源網(wǎng)絡(luò),如有侵權(quán)請聯(lián)系刪除)