教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

Flink cep庫在處理事件時間延遲問題[大數(shù)據(jù)培訓]

更新時間:2019年10月16日15時13分 來源:傳智播客 瀏覽次數(shù):

1、簡介

Flink CEP是在flink之上實現(xiàn)的復雜事件處理(CEP)庫,它允許我們在事件流中檢測事件的模式,讓我們有機會掌握數(shù)據(jù)中重要的事項。

本文章主要是介紹了flink cep中可用的api調(diào)用,首先介紹Pattern API,它允許你指定要在事件流中檢測的模式,并介紹匹配事件并對其進行操作。最后分析下CEP庫在處理事件時間延遲問題。

2、使用步驟

(1)首先我們需要引入cep的依賴


<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.5.0</version>
</dependency>

(2)確定equals()和hashcode()方法

如果使用CEP,需要我們在datastream中的事件實現(xiàn)正確的equals()和hashcode()方法,因為Flink CEP使用他們來比較和匹配事件。

簡單demo代碼:


val input: DataStream[Event] = ...

val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.select(createAlert(_))

3、Pattern API

Pattern API允許你定義要從輸入流中提取的復雜模式序列。

每個復雜模式序列都是由多個簡單模式組成,簡單模式就是尋找具有相同屬性的單個事件的模式,我們可以先定義一些簡單的模式,然后組合成復雜的序列模式。

可以將模式序列視為此類模式的結(jié)構(gòu)圖,基于用戶指定的條件從一個模式轉(zhuǎn)換到下一個模式,例如:

匹配的是一系列輸入事件,通過一系列有效的模式轉(zhuǎn)換訪問復雜模式圖中的所有模式。注意每個模式必須具有唯一的名稱,以便后續(xù)可以使用該名稱來標識匹配的事件。模式名稱中不能包含字符”:”。

下面我們首先介紹如何定義單個模式,然后再將各個模式組合到復雜模式中。【推薦了解大數(shù)據(jù)培訓課程

單個模式

Pattern可以是單個,也可以是循環(huán)模式,單個模式接收單個事件,而循環(huán)模式可以接收多個事件,在模式匹配符號中,模式“a b + c?d”(或“a”,后跟一個或多個“b”,可選地后跟“c”,后跟“d”),a,c ?,和d是單例模式,而b +是循環(huán)模式。

默認情況下,模式是單個模式,可以使用Quantifiers將其轉(zhuǎn)換為循環(huán)模式。每個模式可以有一個或多個條件,基于它接收的事件。

Quantifiers

在FlinkCEP中,可以使用以下方法指定循環(huán)模式:pattern.oneOrMore(),用于期望一個或多個事件發(fā)生的模式(例如之前提到的b+);用于期望給定類型事件的特定出現(xiàn)次數(shù)的模式,對于名為start的模式,以下是有效的Quantifiers:


// expecting 4 occurrences
 start.times(4);

 // expecting 0 or 4 occurrences
 start.times(4).optional();

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4);

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy();

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional();

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy();

 // expecting 1 or more occurrences
 start.oneOrMore();

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy();

 // expecting 0 or more occurrences
 start.oneOrMore().optional();

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy();

 // expecting 2 or more occurrences
 start.timesOrMore(2);

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy();

 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy();


Conditions條件

每個模式中,從一個模式轉(zhuǎn)到下一個模式,可以指定其他條件,我們可以使用下面這些條件:

(1)傳入事件的屬性,例如其值應大于5,或者大于先前接收的事件的平均值;

(2)匹配事件的連續(xù)性,例如檢測模式a,b,c序列中不能有任何非匹配事件。

Conditions on Properties關于屬性的條件

可以通過pattern.where(),pattern.or()或pattern.until()方法指定事件屬性的條件,條件可以是iterativeConditions或SimpleConditions.

(1)迭代條件

這是最常見的條件類型,你可以指定一個條件,該條件基于先前接收的事件的屬性或器子集的統(tǒng)計信息來接收后續(xù)事件。

下面代碼說的是:如果名稱以”foo”開頭同時如果該模式的先前接收的事件的價格總和加上當前事件的價格不超過該值5.0,則迭代條件接收名為”middle”的模式的下一個事件:迭代條件可以很強大,尤其是與循環(huán)模式相結(jié)合,例如:oneOrMore();


middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )

注意對context.getEventsForPattern()的調(diào)用將為給定潛在匹配項查找所有先前接收的事件,此操作代價可能會變化巨大,因此應盡量減少其使用。

(2)簡單條件

這種類型的條件時擴展了前面提到的IterativeCondition類,并且僅根據(jù)事件本身的屬性決定是否接收事件:


start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }});

此外還可以通過pattern.subtype(subclass)方法將接收事件的類型限定為初始事件類型的子類型:


start.where(event => event.getName.startsWith("foo"))

組合條件

如上所示,可以將子類型條件與其他條件組合使用,這適用于所有條件。我們可以通過順序調(diào)用where()來任意組合條件。最終結(jié)果將是各個條件的結(jié)果的邏輯and,要使用or組合條件,可以使用or()方法,如下所示:


pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

停止條件

在循環(huán)模式(oneOrMore()和oneOrMore().optional())的情況下,還可以指定停止條件,例如:接收值大于5的事件,直到其值的總和小于50.

我們看個例子來更好的理解:

給定模式:(a+ until b),b之前,要出現(xiàn)一個或者多個a,

給定輸入的序列:a1,c,a2,b,a3

輸出結(jié)果:{a1 a2}{a1}{a2}{a3}

我們可以看到{a1,a2,a3},{a2,a3}兩個并沒有輸出,這就是停止條件的作用。

連續(xù)事件的條件

Flink CEP支持事件之間以一下形式連續(xù):

嚴格連續(xù)性:希望所有匹配事件一個接一個的出現(xiàn),中間沒有任何不匹配的事件;

寬松連續(xù)性:忽略匹配的事件之間出現(xiàn)不匹配事件,不能忽略兩個事件之間的匹配事件。

非確定性輕松連續(xù)性:進一步放寬連續(xù)性,允許忽略某些匹配事件的其它匹配。

為了解釋上面的內(nèi)容,我們舉個例子。假如有個模式序列"a+ b",輸入序列"a1,c,a2,b",不同連續(xù)條件下有不同的區(qū)別:

嚴格連續(xù)性:{a2 b} - 由于c的存在導致a1被廢棄

寬松連續(xù)性:{a1,b}和{a1 a2 b} - c被忽略

非確定性寬松連續(xù)性:{a1 b}, {a2 b}, 和 {a1 a2 b}

對于循環(huán)模式(例如oneOrMore()和times()),默認是寬松的連續(xù)性。 如果你想要嚴格的連續(xù)性,你必須使用consecutive()顯式指定它, 如果你想要非確定性的松弛連續(xù)性,你可以使用allowCombinations()方法。


組合模式

簡介

已經(jīng)了解了單個模式的樣子,現(xiàn)在是時候看看如何將它們組合成一個完整的模式序列。

模式序列必須以初始模式開始,如下所示:

Pattern start = Pattern.begin("start");

接下來,您可以通過指定它們之間所需的連續(xù)條件,為模式序列添加更多模式。 在上一節(jié)中,我們描述了Flink支持的不同鄰接模式,即嚴格,寬松和非確定性寬松,以及如何在循環(huán)模式中應用它們。 要在連續(xù)模式之間應用它們,可以使用:

next() 對應嚴格, followedBy() 對應寬松連續(xù)性 followedByAny() 對應非確定性寬松連續(xù)性

亦或

notNext() 如果不希望一個事件類型緊接著另一個類型出現(xiàn)。 notFollowedBy() 不希望兩個事件之間任何地方出現(xiàn)該事件。 注意 模式序列不能以notFollowedBy()結(jié)束。 注意 NOT模式前面不能有可選模式。


// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// NOT pattern with strict contiguity
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// NOT pattern with relaxed contiguity
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

寬松連續(xù)性指的是僅第一個成功匹配的事件會被匹配到,然而非確定性寬松連續(xù)性,相同的開始會有多個匹配結(jié)果發(fā)出。距離,如果一個模式是"a b",給定輸入序列是"a c b1 b2"。對于不同連續(xù)性會有不同輸出。

a和b之間嚴格連續(xù)性,將會返回{},也即是沒有匹配。因為c的出現(xiàn)導致a,拋棄了。

a和b之間寬松連續(xù)性,返回的是{a,b1},因為寬松連續(xù)性將會拋棄為匹配成功的元素,直至匹配到下一個要匹配的事件。

a和b之間非確定性寬松連續(xù)性,返回的是{a,b1},{a,b2}。

也可以為模式定義時間約束。 例如,可以通過pattern.within()方法定義模式應在10秒內(nèi)發(fā)生。 時間模式支持處理時間和事件時間。 注意模式序列只能有一個時間約束。 如果在不同的單獨模式上定義了多個這樣的約束,則應用最小的約束。

next.within(Time.seconds(10));

可以為begin,followBy,followByAny和next定義一個模式序列作為條件。模式序列將被邏輯地視為匹配條件,而且將返回GroupPattern并且 可對GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。

0 分享到:
和我們在線交談!