更新時間:2023年06月23日14時18分 來源:傳智教育 瀏覽次數(shù):
kafka不會像其他JMS隊列那樣需要得到消費者的確認,消費者可以使用kafka來追蹤消息在分區(qū)的位置(偏移量)。
消費者會往一個叫做_consumer_offset的特殊主題發(fā)送消息,消息里包含了每個分區(qū)的偏移量。如果消費者發(fā)生崩潰或有新的消費者加入群組,就會觸發(fā)再均衡。
正常的情況
如果消費者2掛掉以后,會發(fā)生再均衡,消費者2負責的分區(qū)會被其他消費者進行消費,再均衡后不可避免會出現(xiàn)一些問題。
問題一:
如果提交偏移量小于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息就會被重復(fù)處理。
問題二:
如果提交的偏移量大于客戶端的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失。如果想要解決這些問題,還要知道目前kafka提交偏移量的方式:
提交偏移量的方式有兩種,分別是自動提交偏移量和手動提交。
當enable.auto.commit被設(shè)置為true,提交方式就是讓消費者自動提交偏移量,每隔5秒消費者會自動把從poll()方法接收的最大偏移量提交上去
手動提交 ,當enable.auto.commit被設(shè)置為false可以有以下三種提交方式
提交當前偏移量(同步提交)
同步和異步組合提交
把enable.auto.commit設(shè)置為false,讓應(yīng)用程序決定何時提交偏移量。使用commitSync()提交偏移量,commitSync()將會提交poll返回的最新的偏移量,所以在處理完所有記錄后要確保調(diào)用了commitSync()方法。否則還是會有消息丟失的風險。
只要沒有發(fā)生不可恢復(fù)的錯誤,commitSync()方法會一直嘗試直至提交成功,如果提交失敗也可以記錄到錯誤日志里。
while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); try { consumer.commitSync();//同步提交當前最新的偏移量 }catch (CommitFailedException e){ System.out.println("記錄提交失敗的異常:"+e); } } }
手動提交有一個缺點,那就是當發(fā)起提交調(diào)用時應(yīng)用會阻塞。當然我們可以減少手動提交的頻率,但這個會增加消息重復(fù)的概率(和自動提交一樣)。另外一個解決辦法是,使用異步提交的API。
while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.out.println("記錄錯誤的提交偏移量:"+ map+",異常信息"+e); } } }); }
異步提交也有個缺點,那就是如果服務(wù)器返回提交失敗,異步提交不會進行重試。相比較起來,同步提交會進行重試直到成功或者最后拋出異常給應(yīng)用。異步提交沒有實現(xiàn)重試是因為,如果同時存在多個異步提交,進行重試可能會導(dǎo)致位移覆蓋。
舉個例子,假如我們發(fā)起了一個異步提交commitA,此時的提交位移為2000,隨后又發(fā)起了一個異步提交commitB且位移為3000;commitA提交失敗但commitB提交成功,此時commitA進行重試并成功的話,會將實際上將已經(jīng)提交的位移從3000回滾到2000,導(dǎo)致消息重復(fù)消費。
try { while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(); } }catch (Exception e){+ e.printStackTrace(); System.out.println("記錄錯誤信息:"+e); }finally { try { consumer.commitSync(); }finally { consumer.close(); } }