一、背景
之前在做電商相關(guān)業(yè)務(wù)的時(shí)候,有一個(gè)常見的需求場(chǎng)景是:用戶下單之后,超過半小時(shí)不支付,就取消訂單。現(xiàn)在我們?cè)谔詫毦〇|買東西,或者通過美團(tuán)點(diǎn)外賣,下單之后,如果不在指定時(shí)間內(nèi)支付,訂單也會(huì)取消。 那么,如何實(shí)現(xiàn)這樣的超時(shí)取消邏輯呢,通過消息隊(duì)列的延時(shí)消息,是一個(gè)非常穩(wěn)定的實(shí)現(xiàn)方案。
RocketMQ 就提供了這樣的延時(shí)消息功能,producer 端在發(fā)送消息時(shí),設(shè)置延遲級(jí)別,從秒級(jí)到分鐘小時(shí)等等。消息在發(fā)送之后,會(huì)在消息隊(duì)列的服務(wù)器進(jìn)行存儲(chǔ)。等過了設(shè)定的延遲時(shí)間之后,消息才會(huì)被consumer端消費(fèi)到。
如果我們?cè)谙聠蔚臅r(shí)候,發(fā)送一條設(shè)置延時(shí)30分鐘的消息,這條消息會(huì)在30分鐘之后被下游系統(tǒng)消費(fèi),然后判斷訂單有沒有支付,如果沒有支付,則取消訂單。那么這樣,通過消息隊(duì)列就完成了一個(gè)延遲取消的邏輯了。
二、原理
設(shè)置延時(shí)
先來看一下如何設(shè)置消息的延時(shí) 消息體可以通過setDelayTimeLevel方法來設(shè)置延時(shí)級(jí)別
public void produce() {
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(1)
SendResult sendResult = producer.send(msg);
}
public void consume() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
其實(shí)是將延遲信息存到 Message 的 property 中(property是一個(gè)保存meta信息的hashmap)
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
void putProperty(final String name, final String value) {
if (null == this.properties) {
this.properties = new HashMap<String, String>();
}
this.properties.put(name, value);
}
之后,broker收到 message之后,會(huì)根據(jù) message 中設(shè)置的延時(shí)級(jí)別進(jìn)行處理 可以看看延時(shí)級(jí)別的具體情況: 一共分為18個(gè)級(jí)別(1-18),對(duì)應(yīng)時(shí)間從1s到2h
public class MessageStoreConfig {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
那么整個(gè)系統(tǒng)是怎么做到讓consumer在設(shè)定的延時(shí)之后,開始消費(fèi)指定消息的呢?
不得不說,RocketMQ 的設(shè)計(jì)還是挺巧妙的,我們接著往下看。
消息預(yù)存
對(duì)于broker收到的延時(shí)消息,并不是和普通消息一樣,進(jìn)入發(fā)送端指定的topic中, 而是進(jìn)入專門的延時(shí)topic中,延時(shí)topic有18條隊(duì)列(queueId 編號(hào)0-17),queueId 和 delayLevel 的關(guān)系是 queueId + 1 = delayLevel,是一一對(duì)應(yīng)的。所以計(jì)算延時(shí)消息的待執(zhí)行時(shí)間deliverTimestamp之后,會(huì)將消息存入對(duì)應(yīng)延時(shí)級(jí)別的隊(duì)列中。
// 如果是延遲消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 重設(shè)延遲消息的topic和queueId,topic為指定的RMQ_SYS_SCHEDULE_TOPIC
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
...
// 將實(shí)際的指定topic和queueId進(jìn)行存入property,進(jìn)行備份
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
之后,會(huì)由ScheduleMessageService來進(jìn)行任務(wù)處理。ScheduleMessageService是broker啟動(dòng)時(shí)就開始執(zhí)行的,用來處理延遲隊(duì)列中的消息,處理的邏輯如下所示。
public class ScheduleMessageService extends ConfigManager {
// key: delayLevel | value: delayTimeMillis
private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
public void start() {
// 創(chuàng)建一個(gè)Timer,用于執(zhí)行定時(shí)任務(wù)
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 這里對(duì)每個(gè)delayLevel的queue都創(chuàng)建一個(gè)DeliverDelayedMessageTimerTask,
// 用來處理對(duì)應(yīng)queue中的消息
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
}
}
ScheduleMessageService啟動(dòng)之后,會(huì)根據(jù)延時(shí)隊(duì)列的數(shù)目創(chuàng)建一一對(duì)應(yīng)的
DeliverDelayedMessageTimerTask,然后周期執(zhí)行。該類繼承自TimerTask,是JDK的工具類,用于執(zhí)行定時(shí)任務(wù),原理可以參考這篇文章 如何實(shí)現(xiàn)定時(shí)任務(wù)- JAVA Timer/TimerTask 源碼原理解析
消息轉(zhuǎn)投
可以看到
DeliverDelayedMessageTimerTask實(shí)現(xiàn)的 run 方法,主要邏輯都在executeOnTimeup方法中,從對(duì)應(yīng)的延遲隊(duì)列中取出時(shí)間已到的 message,發(fā)送到 message 對(duì)應(yīng)原始topic的隊(duì)列中。只要隊(duì)列沒有發(fā)生消費(fèi)積壓,message 就會(huì)馬上被消費(fèi)了。(這部分的代碼實(shí)現(xiàn)比較復(fù)雜,感興趣可以去看對(duì)應(yīng)的源碼)
class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
// 這部分是核心邏輯,實(shí)現(xiàn)的是 從延時(shí)消息隊(duì)列中取出 deliverTimestamp - now <= 0 的消息,
// 將消息從延時(shí)queue移到原本指定Topic的queue中,這些消息就馬上會(huì)被consumer消費(fèi)。
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
}
總體的原理示意圖,如下所示:
broker 在接收到延時(shí)消息的時(shí)候,會(huì)將延時(shí)消息存入到延時(shí)TOPIC的隊(duì)列中,然后ScheduleMessageService中,每個(gè) queue 對(duì)應(yīng)的定時(shí)任務(wù)會(huì)不停地被執(zhí)行,檢查 queue 中哪些消息已到設(shè)定時(shí)間,然后轉(zhuǎn)發(fā)到消息的原始TOPIC,這些消息就會(huì)被各自的 producer 消費(fèi)了。
三、拓展-消費(fèi)重試
平常在使用RocketMQ的時(shí)候,一般會(huì)依賴consumer的消費(fèi)重試功能。 而consumer端的消費(fèi)重試,其實(shí)也是通過這個(gè)和延時(shí)隊(duì)列差不多的原理來實(shí)現(xiàn)的。
public void consume() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 這里如果返回RECONSUME_LATER,就會(huì)重試消費(fèi)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
RocketMQ規(guī)定,以下三種情況統(tǒng)一按照消費(fèi)失敗處理并會(huì)發(fā)起重試。
- 業(yè)務(wù)消費(fèi)方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
- 業(yè)務(wù)消費(fèi)方返回null
- 業(yè)務(wù)消費(fèi)方主動(dòng)/被動(dòng)拋出異常
業(yè)務(wù)代碼中,一般會(huì)利用重試功能去做下游邏輯的重試。而RocketMQ的重試并不是固定時(shí)間間隔重復(fù)進(jìn)行,二是采取的退避式重試,重試的時(shí)間間隔會(huì)不斷變長(zhǎng)。 這個(gè)時(shí)間間隔,和設(shè)置delayLevel的延時(shí)類似。
Consumer客戶端會(huì)通過processConsumeResult方法處理每一條消息的消費(fèi)結(jié)果,如果判斷需要進(jìn)行重試,則會(huì)通過sendMessageBack方法將消息發(fā)送到broker,重試消息會(huì)帶上已重試次數(shù)的信息。
broker收到消息之后,SendMessageProcessor會(huì)對(duì)重試消息進(jìn)行處理,設(shè)置topic為RETRY_TOPIC,具體邏輯如下:
public class SendMessageProcessor
extends AbstractSendMessageProcessor
implements.NETtyRequestProcessor {
private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// 給重試消息設(shè)置新的topic
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
// 根據(jù)已經(jīng)發(fā)生重試的次數(shù)確定delayLevel
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
// 重試次數(shù)+1
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
// 存儲(chǔ)消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
// ...
}
}
存儲(chǔ)消息的時(shí)候,CommitLog.putMessage方法內(nèi)部判斷如果設(shè)置了delayLevel,就會(huì)重設(shè)topic為SCHEDULE_TOPIC,然后將消息存儲(chǔ)到延時(shí)隊(duì)列中,后續(xù)就是和ScheduleMessageService的邏輯相同。
整個(gè)消息重試的邏輯示意圖如下所示:
如圖所示
- Consumer在消費(fèi)的時(shí)候,都會(huì)訂閱指定的TOPIC-NORMAL_TOPIC和該ConsumerGroup對(duì)應(yīng)的重試TOPIC-RETRY_GROUP1_TOPIC,同時(shí)消費(fèi)來自這兩個(gè)topic中的消息。
- 當(dāng)發(fā)生消費(fèi)失敗后,Client端會(huì)調(diào)用sendMessageBack方法將失敗消息發(fā)送回broker。
- broker端的SendMessageProcessor會(huì)根據(jù)當(dāng)前的重試次數(shù)確定延時(shí)級(jí)別,將消息存入延時(shí)隊(duì)列-SCHEDULE_TOPIC中。
- ScheduleMessageService會(huì)將到期的消息重新發(fā)送到重試TOPIC-RETRY_GROUP1_TOPIC中,這個(gè)時(shí)候消息被Consumer消費(fèi),就完成了整個(gè)重試過程。
可以對(duì)比之前的延時(shí)消息流程去看,其實(shí)重試消息就是將失敗的消息當(dāng)作延時(shí)消息進(jìn)行處理,只不過最后投入的是專門的重試消息隊(duì)列中。
四、總結(jié)
延時(shí)消息都是非常日常業(yè)務(wù)使用中很重要的功能,而RocketMQ通過時(shí)間片分級(jí)+多隊(duì)列+定時(shí)任務(wù),就實(shí)現(xiàn)了這樣的功能,設(shè)計(jì)上是很巧妙的。并且消費(fèi)重試采用退避式的策略,重試時(shí)間的梯度剛好與延時(shí)消息策略一致,這樣就可以直接利用延時(shí)隊(duì)列去完成消息重試的功能,從策略上來說非常合理(消息消費(fèi)重復(fù)失敗,在短時(shí)間內(nèi)重試成功的可能性比較低),并且復(fù)用了底層代碼,這些是值得去學(xué)習(xí)和借鑒的。






