教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

Flink中watermark的生成規(guī)則及其功能【大數(shù)據(jù)面經(jīng)】

更新時(shí)間:2021年01月15日17時(shí)49分 來源:傳智教育 瀏覽次數(shù):

問題分析

假如我們自己寫一個(gè)流式框架。我們該如何處理消息。正常情況下,我們看到消息按照順序一個(gè)個(gè)發(fā)送,接受后按照順序處理,這是沒有什么問題的。然而也要考慮到一些特殊情況下,消息不在是按照順序發(fā)送,產(chǎn)生了亂序,這時(shí)候該怎么處理?

核心問題講解

(1)watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。但是對(duì)于late element,我們又不能無限期的等下去,必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算。這個(gè)特別的機(jī)制,就是watermark。

(2)通常,在接收到source的數(shù)據(jù)后,應(yīng)該立刻生成watermark;但是,也可以在接收source后,應(yīng)用簡單的map或者filter操作,然后再生成watermark。

(3)如果延遲的數(shù)據(jù)有業(yè)務(wù)需要,則設(shè)置好允許延遲的時(shí)間,因?yàn)槲覀儾荒軣o限期的等下去。每個(gè)窗口都有屬于自己的最大等待延遲數(shù)據(jù)的時(shí)間限制,窗口結(jié)束時(shí)間+延遲時(shí)間=最大waterMark值,即當(dāng)waterMark值大于的上述計(jì)算出的最大waterMark值,該窗口內(nèi)的數(shù)據(jù)就屬于遲到的數(shù)據(jù),無法參與window計(jì)算。

問題擴(kuò)展

  • Window:Window是處理無界流的關(guān)鍵,Windows將流拆分為一個(gè)個(gè)有限大小的buckets,可以在每一個(gè)buckets中進(jìn)行計(jì)算。
  • start_time,end_time:當(dāng)Window時(shí)時(shí)間窗口的時(shí)候,每個(gè)window都會(huì)有一個(gè)開始時(shí)間和結(jié)束時(shí)間(前開后閉),這個(gè)時(shí)間是系統(tǒng)時(shí)間。
  • event-time: 事件發(fā)生時(shí)間,是事件發(fā)生所在設(shè)備的當(dāng)?shù)貢r(shí)間,比如一個(gè)點(diǎn)擊事件的時(shí)間發(fā)生時(shí)間,是用戶點(diǎn)擊操作所在的手機(jī)或電腦的時(shí)間。
  • Watermarks:可以把他理解為一個(gè)水位線,這個(gè)Watermarks在不斷的變化,一旦Watermarks大于了某個(gè)window的end_time,就會(huì)觸發(fā)此window的計(jì)算,Watermarks就是用來觸發(fā)window計(jì)算的。

結(jié)合項(xiàng)目中使用

watermark如何處理亂序數(shù)據(jù)?
假如我們設(shè)置10s的時(shí)間窗口(window),那么0~10s,10~20s都是一個(gè)窗口,以0~10s為例,0位start-time,10為end-time。假如有4個(gè)數(shù)據(jù)的event-time分別是8(A),12.5(B),9(C),13.5(D),我們設(shè)置Watermarks為當(dāng)前所有到達(dá)數(shù)據(jù)event-time的最大值減去延遲值3.5秒。

當(dāng)A到達(dá)的時(shí)候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會(huì)觸發(fā)計(jì)算;
當(dāng)B到達(dá)的時(shí)候,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會(huì)觸發(fā)計(jì)算;
當(dāng)C到達(dá)的時(shí)候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會(huì)觸發(fā)計(jì)算;
當(dāng)D到達(dá)的時(shí)候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發(fā)計(jì)算;
觸發(fā)計(jì)算的時(shí)候,會(huì)將AC(因?yàn)樗麄兌夹∮?0)都計(jì)算進(jìn)去。
通過上面這種方式,我們就將遲到的C計(jì)算進(jìn)去了。這里的延遲3.5s是我們假設(shè)一個(gè)數(shù)據(jù)到達(dá)的時(shí)候,比他早3.5s的數(shù)據(jù)肯定也都到達(dá)了,這個(gè)是需要根據(jù)經(jīng)驗(yàn)推算的,加入D到達(dá)以后有到達(dá)了一個(gè)E,event-time=6,但是由于0~10的時(shí)間窗口已經(jīng)開始計(jì)算了,所以E就丟了。


猜你喜歡:

Flink cep庫在處理事件時(shí)間延遲問題

Scala重寫父類有哪些注意事項(xiàng)?重寫代碼演示

Apache Hive Spark Streaming工作原理是什么?

傳智教育大數(shù)據(jù)培訓(xùn)課程

0 分享到:
和我們在線交談!