1 背景
在金融系統(tǒng)中MQ消息的消息丟失是不允許的,消息的丟失會(huì)導(dǎo)致支付狀態(tài)訂單狀態(tài)出現(xiàn)混亂。接下來聊一下如何保證MQ消息不丟失,以筆者公司使用的RocketMQ為例。
2 RokectMQ消息什么情況下會(huì)丟失?
MQ的消息生成到消費(fèi)主要經(jīng)歷三個(gè)階段:MQ消息生產(chǎn)、RocketMQ Broker存儲(chǔ)消息、消費(fèi)者消息對(duì)應(yīng)的消息。如下圖:
從上圖可以知道消息丟失主要會(huì)發(fā)生在下面幾個(gè)地方:
- 消息生產(chǎn)者將消息發(fā)送到RocketMQ Broker的這個(gè)過程可能出現(xiàn)消息丟失。
- RocketMQ Broker接收到生產(chǎn)者發(fā)送的消息存儲(chǔ)的過程消息可能丟失。
- 消費(fèi)者處理失敗,但是將錯(cuò)誤進(jìn)行捕捉,導(dǎo)致消息出現(xiàn)虛假的消費(fèi)成功。實(shí)際上沒有消費(fèi),但是在MQ看來消費(fèi)完成了消費(fèi)。
3 如何解決RokectMQ消息丟失
解決消息丟失從消息丟失的地方入手。
3.1 消息生產(chǎn)防止消息丟失
RocketMQ消息生產(chǎn)方式有三種:同步發(fā)送消息、異步發(fā)送消息、One-Way發(fā)送消息。不同的發(fā)送方式使用不同的場(chǎng)景:
同步發(fā)送消息: 重要的通知(訂單狀態(tài)的更新)、短信系統(tǒng)。
異步發(fā)送消息: 通常用于響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景。
One-way: 主要用于對(duì)可靠性要求不高的場(chǎng)景,在金融的場(chǎng)景下不適用。一般是用于日志收集。
根據(jù)上面三種發(fā)送方式的特點(diǎn), one-way 消息發(fā)送模式本身就是對(duì)消息的丟失無法保證。所以如果你的系統(tǒng)對(duì)消息丟失零容忍不能使用 one-way 的方式發(fā)送。同步發(fā)送消息和異步發(fā)送消息 都可以判斷消息的發(fā)送狀態(tài)判斷消息是否已經(jīng)發(fā)送到Broker。這里是選擇同步發(fā)送還是異步發(fā)送消息看業(yè)務(wù)的需要,同步發(fā)送比較關(guān)心發(fā)送后返回的結(jié)果對(duì)時(shí)間的要求不是那么敏感。異步發(fā)送對(duì)消息返回時(shí)間敏感。
SendResult定義說明(來自RocketMQ官方)
- SEND_OK
- 消息發(fā)送成功。要注意的是消息發(fā)送成功也不意味著它是可靠的。要確保不會(huì)丟失任何消息,還應(yīng)啟用同步Master服務(wù)器或同步刷盤,即SYNC_MASTER或SYNC_FLUSH。
- FLUSH_DISK_TIMEOUT
- 消息發(fā)送成功但是服務(wù)器刷盤超時(shí)。此時(shí)消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列(內(nèi)存),只有服務(wù)器宕機(jī),消息才會(huì)丟失。消息存儲(chǔ)配置參數(shù)中可以設(shè)置刷盤方式和同步刷盤時(shí)間長(zhǎng)度,如果Broker服務(wù)器設(shè)置了刷盤方式為同步刷盤,即FlushDiskType=SYNC_FLUSH(默認(rèn)為異步刷盤方式),當(dāng)Broker服務(wù)器未在同步刷盤時(shí)間內(nèi)(默認(rèn)為5s)完成刷盤,則將返回該狀態(tài)——刷盤超時(shí)。
- FLUSH_SLAVE_TIMEOUT
- 消息發(fā)送成功,但是服務(wù)器同步到Slave時(shí)超時(shí)。此時(shí)消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有服務(wù)器宕機(jī),消息才會(huì)丟失。如果Broker服務(wù)器的角色是同步Master,即SYNC_MASTER(默認(rèn)是異步Master即ASYNC_MASTER),并且從Broker服務(wù)器未在同步刷盤時(shí)間(默認(rèn)為5秒)內(nèi)完成與主服務(wù)器的同步,則將返回該狀態(tài)——數(shù)據(jù)同步到Slave服務(wù)器超時(shí)。
- SLAVE_NOT_AVAILABLE
- 消息發(fā)送成功,但是此時(shí)Slave不可用。如果Broker服務(wù)器的角色是同步Master,即SYNC_MASTER(默認(rèn)是異步Master服務(wù)器即ASYNC_MASTER),但沒有配置slave Broker服務(wù)器,則將返回該狀態(tài)——無Slave服務(wù)器可用。
3.2 RocketMQ Broker防丟失消息
首先了解一下Broker集群部署模式(官方方案)。
單Master模式
這種方式風(fēng)險(xiǎn)較大,一旦Broker重啟或者宕機(jī)時(shí),會(huì)導(dǎo)致整個(gè)服務(wù)不可用。不建議線上環(huán)境使用,可以用于本地測(cè)試。
多Master模式
一個(gè)集群無Slave,全是Master,例如2個(gè)Master或者3個(gè)Master,這種模式的優(yōu)缺點(diǎn)如下
- 優(yōu)點(diǎn):配置簡(jiǎn)單,單個(gè)Master宕機(jī)或重啟維護(hù)對(duì)應(yīng)用無影響,在磁盤配置為RAID10時(shí),即使機(jī)器宕機(jī)不可恢復(fù)情況下,由于RAID10磁盤非常可靠,消息也不會(huì)丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;
- 缺點(diǎn):?jiǎn)闻_(tái)機(jī)器宕機(jī)期間,這臺(tái)機(jī)器尚未被消費(fèi)的消息在機(jī)器恢復(fù)之前不可訂閱,消息實(shí)時(shí)性會(huì)受到影響。
多Master多Slave模式-異步復(fù)制
每個(gè)Master配置一個(gè)Slave,有多對(duì)Master-Slave,HA采用異步復(fù)制方式,主備有短暫消息延遲(毫秒級(jí)),這種模式的優(yōu)缺點(diǎn)如下
- 優(yōu)點(diǎn):即使磁盤損壞,消息丟失的非常少,且消息實(shí)時(shí)性不會(huì)受影響,同時(shí)Master宕機(jī)后,消費(fèi)者仍然可以從Slave消費(fèi),而且此過程對(duì)應(yīng)用透明,不需要人工干預(yù),性能同多Master模式幾乎一樣;
- 缺點(diǎn):Master宕機(jī),磁盤損壞情況下會(huì)丟失少量消息(非同步刷盤的情況下)
多Master多Slave模式-同步雙寫
每個(gè)Master配置一個(gè)Slave,有多對(duì)Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應(yīng)用返回成功,這種模式的優(yōu)缺點(diǎn)如下:
- 優(yōu)點(diǎn):數(shù)據(jù)與服務(wù)都無單點(diǎn)故障,Master宕機(jī)情況下,消息無延遲,服務(wù)可用性與數(shù)據(jù)可用性都非常高;
- 缺點(diǎn):性能比異步復(fù)制模式略低(大約低10%左右),發(fā)送單個(gè)消息的RT會(huì)略高,且目前版本在主節(jié)點(diǎn)宕機(jī)后,備機(jī)不能自動(dòng)切換為主機(jī)
如果是想不存在消息丟失的情況,那么在多Master的情況下要配置消息同步刷盤,而在 多Master多Slave模式-同步雙寫 的情況下配置同步刷盤。
3.3 消費(fèi)端處理消息
消息消費(fèi)示例代碼如下:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//處理消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在處理消息的時(shí)候,如果消息處理失敗返回的狀態(tài)不應(yīng)該是
ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。如果消息處理失敗返回的是
ConsumeConcurrentlyStatus.CONSUME_SUCCESS 消息就不能再次被消息。在Broker看來就是已經(jīng)消費(fèi)完成。
4 總結(jié)
MQ消息的丟失主要發(fā)生在發(fā)送、存儲(chǔ)、消費(fèi)消息的三個(gè)階段,所以需要防止消息丟失也要從這三個(gè)方面著手。
- 發(fā)送消息使用同步或者異步的方式,然后根據(jù)返回的消息 SendResult 來判斷是否發(fā)送成功
- Broker的刷盤方式配置成同步刷盤
- 消息消息失敗根據(jù)業(yè)務(wù)需要來判斷是否需要重新消費(fèi)消息。
我是螞蟻背大象,文章對(duì)你有幫助點(diǎn)贊關(guān)注我,文章有不正確的地方請(qǐng)您斧正留言評(píng)論~謝謝!






