更新時間:2018年10月24日15時50分 來源:傳智播客 瀏覽次數(shù):
一、什么是延時隊(duì)列
所謂延時隊(duì)列是指消息push到隊(duì)列后,監(jiān)聽的消費(fèi)者不能第一時間獲取消息,需要等到指定時間才能消費(fèi)。
一般在業(yè)務(wù)里面需要對某些消息做定時發(fā)送,不想走定時任務(wù)或者是用戶下單之后多長時間自動失效類似的場景可以考慮通過延時隊(duì)列實(shí)現(xiàn)。
二、RabbitMQ實(shí)現(xiàn)
MQ本身并不支持直接的延時隊(duì)列實(shí)現(xiàn),但是我們可以通過RabbitMQ的消息TTL和Dead Letter規(guī)則來實(shí)現(xiàn)
Time TO Live (TTL): RabbitMQ可以針對Queue設(shè)置x-expires 或者 針對Message設(shè)置 x-message-ttl,來控制消息的生存時間
Dead Letter 死信 RabbitMQ官網(wǎng)這樣定義死信消息:
. 消息被拒絕(basic.reject或basic.nack)并且requeue=false.
. 消息TTL過期
隊(duì)列達(dá)到最大長度(隊(duì)列滿了,無法再添加數(shù)據(jù)到mq中)
Dead Letter Exchanges(DLX)死信交換機(jī) MQ默認(rèn)的死信消息是丟棄的,但是我們可以通過設(shè)置以下兩個屬性讓死信消息轉(zhuǎn)發(fā)到我們指定的隊(duì)列。
x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送
延時隊(duì)列實(shí)現(xiàn): 了解了MQ隊(duì)列的TTL和Dead Letter之后,我們就可以通過這兩個特性來實(shí)現(xiàn),首先我們通過設(shè)置消息或者隊(duì)列的TTL來設(shè)置消息在指定時間后成為死信,再設(shè)置死信消息的路由轉(zhuǎn)發(fā)規(guī)則到特定隊(duì)列,消費(fèi)者通過監(jiān)聽這個特定隊(duì)列就能實(shí)現(xiàn)延時隊(duì)列的效果。
代碼實(shí)現(xiàn)
生產(chǎn)者發(fā)送消息:ttlQueue存放過期時間的隊(duì)列,deadLetterQueue死信轉(zhuǎn)發(fā)隊(duì)列,seconds是過期時間
public static void sendTTLMsg(String ttlQueue, String deadLetterQueue, Object msg, Integer seconds) {
MqSender.getInstance().setHost(RABBIT_MQ_HOST);
// 獲取到連接以及MQ通道
Connection connection;
try {
connection = MqSender.getInstance().newConnection();
// 從連接中創(chuàng)建通道
Channel channel = connection.createChannel();
// 配置
Map
args.put("x-dead-letter-exchange", "");
args.put("x-dead-letter-routing-key", deadLetterQueue);
channel.queueDeclare(deadLetterQueue, true, false, false, null);
channel.queueDeclare(ttlQueue, true, false, false, args);
// 發(fā)送消息
channel.basicPublish("", ttlQueue, new AMQP.BasicProperties.Builder().expiration(String.valueOf(seconds)).build(), MAPPER.writeValueAsBytes(msg));
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
消費(fèi)者通過監(jiān)聽deadLetterQueue來實(shí)現(xiàn)延時消息監(jiān)聽
三、 延時隊(duì)列的問題
通過我們測試發(fā)現(xiàn),這種方式實(shí)現(xiàn)的延時隊(duì)列,在隊(duì)列設(shè)置TTL的情況下是可以正常的,但是如果根據(jù)消息設(shè)置了不同的TTL,就會有問題,因?yàn)镸Q本質(zhì)上還是消息隊(duì)列中間件,隊(duì)列是遵循先進(jìn)先出的,如果有兩個消息先后入隊(duì),但是后入隊(duì)的消息TTL小于前面的消息,它必須等待之前的消息被消費(fèi)完后才能挪到隊(duì)列頭部,這樣不同延時消息就會出現(xiàn)問題。
通過RabbitMQ官網(wǎng)的文檔也介紹了這個問題:
Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered)
所以我才稱之為MQ的偽延時隊(duì)列,這種延時隊(duì)列在消息TTL不同的情況下并不能實(shí)現(xiàn)真正的延時消費(fèi)。
四、解決RabbitMQ的偽延時方案
既然RabbitMQ無法支持不同TTL消息的延時消費(fèi),那么如果我們要實(shí)現(xiàn)這種功能,有什么方案呢,在實(shí)際業(yè)務(wù)開發(fā)中,我們有這樣的解決方案:
首先我們會創(chuàng)建多級延時消費(fèi)隊(duì)列(比如兩分鐘,三十分鐘,一天三種,具體可以根據(jù)業(yè)務(wù)量和訪問量還有時間精確度來劃分,這里的兩分鐘、三十分鐘是指隊(duì)列統(tǒng)一的TTL),push消費(fèi)隊(duì)列的時候,會根據(jù)需要延時的時間,丟到不同的消費(fèi)隊(duì)列,比如小于三十分鐘的我們push到兩分鐘隊(duì)列,三十分鐘到一天的放入三十分鐘隊(duì)列,超過一天的放入一天隊(duì)列,在死信隊(duì)列的監(jiān)聽器做同樣的判斷,如果是小于等于當(dāng)前時間消息的,立馬消費(fèi),否則按照上述規(guī)則繼續(xù)循環(huán)到不同的延時隊(duì)列
這種方案解決了多級延時消費(fèi)的問題,并且能夠較大程度地避免了消息的重復(fù)循環(huán),降低MQ的壓力,但是缺點(diǎn)也比較明顯,因?yàn)樽畹褪莾煞昼姷难訒r,理論上來說最多會有兩分鐘的誤差,如果對時間要求性比較高的,可以適當(dāng)調(diào)低最低一級別的延時TTL,比如一分鐘或者三十秒
類似代碼如下:cts是需要消費(fèi)掉的時間戳
long now = System.currentTimeMillis();
long cts = Long.valueOf(feedComment.getCts());
if (cts - now <= 30 * 60 * 1000) {
MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_2MIN, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 2 * 60);
} else if (cts - now <= 24 * 60 * 60 * 1000) {
MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_30MIN, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 30 * 60);
} else {
MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_24HOUR, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 24 * 60 * 60);
}
北京校區(qū)