亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.430618.com 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

一、背景

之前在做電商相關(guān)業(yè)務(wù)的時(shí)候,有一個(gè)常見的需求場(chǎng)景是:用戶下單之后,超過半小時(shí)不支付,就取消訂單。現(xiàn)在我們?cè)谔詫毦〇|買東西,或者通過美團(tuán)點(diǎn)外賣,下單之后,如果不在指定時(shí)間內(nèi)支付,訂單也會(huì)取消。 那么,如何實(shí)現(xiàn)這樣的超時(shí)取消邏輯呢,通過消息隊(duì)列的延時(shí)消息,是一個(gè)非常穩(wěn)定的實(shí)現(xiàn)方案。

RocketMQ 就提供了這樣的延時(shí)消息功能,producer 端在發(fā)送消息時(shí),設(shè)置延遲級(jí)別,從秒級(jí)到分鐘小時(shí)等等。消息在發(fā)送之后,會(huì)在消息隊(duì)列的服務(wù)器進(jìn)行存儲(chǔ)。等過了設(shè)定的延遲時(shí)間之后,消息才會(huì)被consumer端消費(fèi)到。

 

如果我們?cè)谙聠蔚臅r(shí)候,發(fā)送一條設(shè)置延時(shí)30分鐘的消息,這條消息會(huì)在30分鐘之后被下游系統(tǒng)消費(fèi),然后判斷訂單有沒有支付,如果沒有支付,則取消訂單。那么這樣,通過消息隊(duì)列就完成了一個(gè)延遲取消的邏輯了。

二、原理

設(shè)置延時(shí)

先來看一下如何設(shè)置消息的延時(shí) 消息體可以通過setDelayTimeLevel方法來設(shè)置延時(shí)級(jí)別

public void produce() {
    Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.setDelayTimeLevel(1)
    SendResult sendResult = producer.send(msg);
}

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

其實(shí)是將延遲信息存到 Message 的 property 中(property是一個(gè)保存meta信息的hashmap)

public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

void putProperty(final String name, final String value) {
    if (null == this.properties) {
        this.properties = new HashMap<String, String>();
    }

    this.properties.put(name, value);
}

之后,broker收到 message之后,會(huì)根據(jù) message 中設(shè)置的延時(shí)級(jí)別進(jìn)行處理 可以看看延時(shí)級(jí)別的具體情況: 一共分為18個(gè)級(jí)別(1-18),對(duì)應(yīng)時(shí)間從1s到2h

public class MessageStoreConfig {
    
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

}

那么整個(gè)系統(tǒng)是怎么做到讓consumer在設(shè)定的延時(shí)之后,開始消費(fèi)指定消息的呢?

不得不說,RocketMQ 的設(shè)計(jì)還是挺巧妙的,我們接著往下看。

消息預(yù)存

對(duì)于broker收到的延時(shí)消息,并不是和普通消息一樣,進(jìn)入發(fā)送端指定的topic中, 而是進(jìn)入專門的延時(shí)topic中,延時(shí)topic有18條隊(duì)列(queueId 編號(hào)0-17),queueId 和 delayLevel 的關(guān)系是 queueId + 1 = delayLevel,是一一對(duì)應(yīng)的。所以計(jì)算延時(shí)消息的待執(zhí)行時(shí)間deliverTimestamp之后,會(huì)將消息存入對(duì)應(yīng)延時(shí)級(jí)別的隊(duì)列中。

// 如果是延遲消息
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    
    // 重設(shè)延遲消息的topic和queueId,topic為指定的RMQ_SYS_SCHEDULE_TOPIC
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    ...
    // 將實(shí)際的指定topic和queueId進(jìn)行存入property,進(jìn)行備份
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));        
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

之后,會(huì)由ScheduleMessageService來進(jìn)行任務(wù)處理。ScheduleMessageService是broker啟動(dòng)時(shí)就開始執(zhí)行的,用來處理延遲隊(duì)列中的消息,處理的邏輯如下所示。

public class ScheduleMessageService extends ConfigManager {

    // key: delayLevel | value: delayTimeMillis 
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

    public void start() {
        // 創(chuàng)建一個(gè)Timer,用于執(zhí)行定時(shí)任務(wù)
        this.timer = new Timer("ScheduleMessageTimerThread", true);
            
        // 這里對(duì)每個(gè)delayLevel的queue都創(chuàng)建一個(gè)DeliverDelayedMessageTimerTask,
        // 用來處理對(duì)應(yīng)queue中的消息
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    }
    
}

ScheduleMessageService啟動(dòng)之后,會(huì)根據(jù)延時(shí)隊(duì)列的數(shù)目創(chuàng)建一一對(duì)應(yīng)的
DeliverDelayedMessageTimerTask,然后周期執(zhí)行。該類繼承自TimerTask,是JDK的工具類,用于執(zhí)行定時(shí)任務(wù),原理可以參考這篇文章 如何實(shí)現(xiàn)定時(shí)任務(wù)- JAVA Timer/TimerTask 源碼原理解析

消息轉(zhuǎn)投

可以看到
DeliverDelayedMessageTimerTask實(shí)現(xiàn)的 run 方法,主要邏輯都在executeOnTimeup方法中,從對(duì)應(yīng)的延遲隊(duì)列中取出時(shí)間已到的 message,發(fā)送到 message 對(duì)應(yīng)原始topic的隊(duì)列中。只要隊(duì)列沒有發(fā)生消費(fèi)積壓,message 就會(huì)馬上被消費(fèi)了。(這部分的代碼實(shí)現(xiàn)比較復(fù)雜,感興趣可以去看對(duì)應(yīng)的源碼)

class DeliverDelayedMessageTimerTask extends TimerTask {
    private final int delayLevel;
    private final long offset;

    public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
        this.delayLevel = delayLevel;
        this.offset = offset;
    }

    @Override
    public void run() {
        try {
            if (isStarted()) {
                this.executeOnTimeup();
            }
        } catch (Exception e) {
            // XXX: warn and notify me
            log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
        }
    }

    public void executeOnTimeup() {
        ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));

        long failScheduleOffset = offset;

        if (cq != null) {
            
            // 這部分是核心邏輯,實(shí)現(xiàn)的是 從延時(shí)消息隊(duì)列中取出 deliverTimestamp - now <= 0 的消息,
            // 將消息從延時(shí)queue移到原本指定Topic的queue中,這些消息就馬上會(huì)被consumer消費(fèi)。
        } 

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
    }
}

總體的原理示意圖,如下所示:

 

broker 在接收到延時(shí)消息的時(shí)候,會(huì)將延時(shí)消息存入到延時(shí)TOPIC的隊(duì)列中,然后ScheduleMessageService中,每個(gè) queue 對(duì)應(yīng)的定時(shí)任務(wù)會(huì)不停地被執(zhí)行,檢查 queue 中哪些消息已到設(shè)定時(shí)間,然后轉(zhuǎn)發(fā)到消息的原始TOPIC,這些消息就會(huì)被各自的 producer 消費(fèi)了。

三、拓展-消費(fèi)重試

平常在使用RocketMQ的時(shí)候,一般會(huì)依賴consumer的消費(fèi)重試功能。 而consumer端的消費(fèi)重試,其實(shí)也是通過這個(gè)和延時(shí)隊(duì)列差不多的原理來實(shí)現(xiàn)的。

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 這里如果返回RECONSUME_LATER,就會(huì)重試消費(fèi)
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

RocketMQ規(guī)定,以下三種情況統(tǒng)一按照消費(fèi)失敗處理并會(huì)發(fā)起重試。

  • 業(yè)務(wù)消費(fèi)方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  • 業(yè)務(wù)消費(fèi)方返回null
  • 業(yè)務(wù)消費(fèi)方主動(dòng)/被動(dòng)拋出異常

業(yè)務(wù)代碼中,一般會(huì)利用重試功能去做下游邏輯的重試。而RocketMQ的重試并不是固定時(shí)間間隔重復(fù)進(jìn)行,二是采取的退避式重試,重試的時(shí)間間隔會(huì)不斷變長(zhǎng)。 這個(gè)時(shí)間間隔,和設(shè)置delayLevel的延時(shí)類似。

Consumer客戶端會(huì)通過processConsumeResult方法處理每一條消息的消費(fèi)結(jié)果,如果判斷需要進(jìn)行重試,則會(huì)通過sendMessageBack方法將消息發(fā)送到broker,重試消息會(huì)帶上已重試次數(shù)的信息。

broker收到消息之后,SendMessageProcessor會(huì)對(duì)重試消息進(jìn)行處理,設(shè)置topic為RETRY_TOPIC,具體邏輯如下:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements.NETtyRequestProcessor {

    private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {

        // 給重試消息設(shè)置新的topic
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        // 根據(jù)已經(jīng)發(fā)生重試的次數(shù)確定delayLevel
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);

        // 重試次數(shù)+1
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        // 存儲(chǔ)消息
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        // ...
    }   
}

存儲(chǔ)消息的時(shí)候,CommitLog.putMessage方法內(nèi)部判斷如果設(shè)置了delayLevel,就會(huì)重設(shè)topic為SCHEDULE_TOPIC,然后將消息存儲(chǔ)到延時(shí)隊(duì)列中,后續(xù)就是和ScheduleMessageService的邏輯相同。

整個(gè)消息重試的邏輯示意圖如下所示:

 

如圖所示

  1. Consumer在消費(fèi)的時(shí)候,都會(huì)訂閱指定的TOPIC-NORMAL_TOPIC和該ConsumerGroup對(duì)應(yīng)的重試TOPIC-RETRY_GROUP1_TOPIC,同時(shí)消費(fèi)來自這兩個(gè)topic中的消息。
  2. 當(dāng)發(fā)生消費(fèi)失敗后,Client端會(huì)調(diào)用sendMessageBack方法將失敗消息發(fā)送回broker。
  3. broker端的SendMessageProcessor會(huì)根據(jù)當(dāng)前的重試次數(shù)確定延時(shí)級(jí)別,將消息存入延時(shí)隊(duì)列-SCHEDULE_TOPIC中。
  4. ScheduleMessageService會(huì)將到期的消息重新發(fā)送到重試TOPIC-RETRY_GROUP1_TOPIC中,這個(gè)時(shí)候消息被Consumer消費(fèi),就完成了整個(gè)重試過程。

可以對(duì)比之前的延時(shí)消息流程去看,其實(shí)重試消息就是將失敗的消息當(dāng)作延時(shí)消息進(jìn)行處理,只不過最后投入的是專門的重試消息隊(duì)列中。

四、總結(jié)

延時(shí)消息都是非常日常業(yè)務(wù)使用中很重要的功能,而RocketMQ通過時(shí)間片分級(jí)+多隊(duì)列+定時(shí)任務(wù),就實(shí)現(xiàn)了這樣的功能,設(shè)計(jì)上是很巧妙的。并且消費(fèi)重試采用退避式的策略,重試時(shí)間的梯度剛好與延時(shí)消息策略一致,這樣就可以直接利用延時(shí)隊(duì)列去完成消息重試的功能,從策略上來說非常合理(消息消費(fèi)重復(fù)失敗,在短時(shí)間內(nèi)重試成功的可能性比較低),并且復(fù)用了底層代碼,這些是值得去學(xué)習(xí)和借鑒的。

分享到:
標(biāo)簽:RocketMQ
用戶無頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定