問題背景
當使用消息隊列時,客戶端重復消費可能會成為一個嚴重的問題。 這是因為消息隊列具有持久性和可靠性的特性,確保消息能夠被成功傳遞給消費者。然而,這也會導致客戶端在某些情況下重復消費消息,例如網絡故障、客戶端崩潰、消息處理失敗等情況。
為了避免這種情況發生,需要在客戶端實現一些機制來確保消息不會被重復消費,例如記錄消費者已經處理的消息 ID、使用分布式鎖來控制消費進程的唯一性等。這些機制能夠保證消息被成功處理,同時也能夠提高系統的可靠性和穩定性。
今天的文章我們將探討如何確保消息隊列中的消息不會被重復消費,下文將以 RocketMQ 為例說明。
消息冪等性
消息中間件是分布式系統中常用的組件,它具有廣泛的應用價值,例如實現異步化、解耦、削峰等功能。通常情況下,我們認為消息中間件是一個可靠的組件。這里的可靠性指的是,只要消息被成功投遞到了消息中間件,它就不會丟失,至少能夠被消費者成功消費一次。這是消息中間件最基本的特性之一,也就是我們通常所說的 “AT LEAST ONCE”,即消息至少會被成功消費一遍。
舉個例子,假設一個消息M被發送到消息中間件并被消費程序A接收到,A開始消費這個消息,但是在消費過程中程序重啟了。由于這個消息沒有被標記為已經被消費成功,消息中間件會持續地將這個消息投遞給消費者,直到消息被成功消費為止。
然而,這種可靠性特性也會導致消息被多次投遞的情況。舉個例子,仍然以之前的例子為例,如果消費程序A接收并完成消息M的消費邏輯后,正準備通知消息中間件“我已經消費成功了”,但在此之前程序A又重啟了,那么對于消息中間件來說,這個消息M并沒有被成功消費過,因此消息中間件會繼續投遞這個消息。而對于消費程序A來說,盡管它已經成功消費了這個消息,但由于程序重啟導致消息中間件繼續投遞,看起來就好像這個消息還沒有被成功消費過一樣。
在 RockectMQ 的場景中,這意味著同一個 messageId 的消息會被重復投遞。由于消息的可靠投遞是更重要的,所以避免消息重復投遞的任務轉移給了應用程序自身來實現。這也是 RocketMQ 文檔強調消費邏輯需要自行實現冪等性的原因。實際上,這背后的邏輯是:在分布式場景下,保證消息不丟和避免消息重復投遞是矛盾的,但是消息重復投遞是可以解決的,而消息丟失則非常麻煩。
冪等設計
讓我們先來了解一下郵件消息的發送流程,以便更好了解消息隊列冪等工作原理。

正如我們在之前提到的,RocketMQ 遵循 "AT LEAST ONCE" 語義,這意味著消息可能會被重復消費。在發送郵件消息的情況下,由于消息可能被重復消費,我們需要保證冪等性,以確保郵件不會被重復發送。
1. 消息發送邏輯
下面這塊代碼是 12306 支付結果回調訂單邏輯實現,通過 RocketMQMessageListener 監聽并消費 RocketMQ 消息。
JAVA復制代碼@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = OrderRocketMQConstant.PAY_GLOBAL_TOPIC_KEY,
selectorExpression = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_TAG_KEY,
consumerGroup = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_CG_KEY
)
public class PayResultCallbackOrderConsumer implements RocketMQListener<MessageWrApper<PayResultCallbackOrderEvent>> {
private final OrderService orderService;
@Transactional(rollbackFor = Exception.class)
@Override
public void onMessage(MessageWrapper<PayResultCallbackOrderEvent> message) {
PayResultCallbackOrderEvent payResultCallbackOrderEvent = message.getMessage();
OrderStatusReversalDTO orderStatusReversalDTO = OrderStatusReversalDTO.builder()
.orderSn(payResultCallbackOrderEvent.getOrderSn())
.orderStatus(OrderStatusEnum.ALREADY_PAID.getStatus())
.build();
orderService.statusReversal(orderStatusReversalDTO);
orderService.payCallbackOrder(payResultCallbackOrderEvent);
}
}
2. 冪等處理邏輯
下述方案的優點在于,使用 redis 消息去重表,不依賴事務,針對消息表本身做了狀態的區分:消費中、消費完成。
如果消息已經在消費中,拋出異常,消息會觸發延遲消費,在 RocketMQ 的場景下即發送到 RETRY TOPIC。

通過該方案可以解決什么問題?
- 消息已經消費成功了,第二條消息將被直接冪等處理掉(消費成功)。
- 并發場景下的消息,依舊能滿足不會出現消息重復,即穿透冪等擋板的問題。
- 支持上游業務生產者重發的業務重復的消息冪等問題。
為什么要給初始化的冪等標識新增 10 分鐘過期時間?
在并發場景下,我們使用消息狀態來實現并發控制,以使第二條消息被不斷延遲消費(即重試)。但如果在此期間第一條消息也因某些異常原因(例如機器重啟或外部異常)未成功消費,該怎么辦呢?因為每次查詢時都會顯示消費中的狀態,所以延遲消費會一直進行下去,直到最終被視為消費失敗并被投遞到死信 Topic 中(RocketMQ 默認最多可以重復消費 16 次)。
針對這個問題,我們采取了一種解決方案:在插入消息表時,必須為每條消息設置一個最長消費過期時間,例如 10 分鐘。這意味著,如果某個消息在消費過程中超過了 10 分鐘,就會被視為消費失敗并從消息表中刪除。
抽象冪等通用組件
為了解決消息隊列中的重復消費問題,我們可以設計一套通用的消息隊列冪等組件。這個組件可以被各個應用程序使用,以確保它們的消費邏輯是冪等的。這種通用的冪等組件可以使應用程序不必為了解決重復消費問題而浪費精力和時間,從而更專注于業務邏輯的實現。
在企業項目中,使用 MySQL 作為冪等去重表的情況比較少見,因此在代碼中只提供了 Redis 實現方案。
1. 定義冪等注解
我們提供了一種通用的冪等注解,該注解可用于 RestAPI 和消息隊列消息防重復場景。
java復制代碼@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
/**
* 冪等Key,只有在 {@link Idempotent#type()} 為 {@link IdempotentTypeEnum#SPEL} 時生效
*/
String key() default "";
/**
* 觸發冪等失敗邏輯時,返回的錯誤提示信息
*/
String message() default "您操作太快,請稍后再試";
/**
* 驗證冪等類型,支持多種冪等方式
* RestAPI 建議使用 {@link IdempotentTypeEnum#TOKEN} 或 {@link IdempotentTypeEnum#PARAM}
* 其它類型冪等驗證,使用 {@link IdempotentTypeEnum#SPEL}
*/
IdempotentTypeEnum type() default IdempotentTypeEnum.PARAM;
/**
* 驗證冪等場景,支持多種 {@link IdempotentSceneEnum}
*/
IdempotentSceneEnum scene() default IdempotentSceneEnum.RESTAPI;
/**
* 設置防重令牌 Key 前綴,MQ 冪等去重可選設置
* {@link IdempotentSceneEnum#MQ} and {@link IdempotentTypeEnum#SPEL} 時生效
*/
String uniqueKeyPrefix() default "";
/**
* 設置防重令牌 Key 過期時間,單位秒,默認 1 小時,MQ 冪等去重可選設置
* {@link IdempotentSceneEnum#MQ} and {@link IdempotentTypeEnum#SPEL} 時生效
*/
long keyTimeout() default 3600L;
}
為了方便理解,整理成思維導圖方便記憶。

2. 定義 AOP 增強
我們使用 AOP 技術為方法增強提供了通用的冪等性保證,只需要在需要保證冪等性的方法上添加 @Idempotent 注解,Aspect 就會對該方法進行增強。
這種技術不僅適用于 RestAPI 場景,還適用于消息隊列的防重復消費場景。
java復制代碼package org.opengoofy.index12306.framework.starter.idempotent.core;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.opengoofy.index12306.framework.starter.idempotent.annotation.Idempotent;
import java.lang.reflect.Method;
/**
* 冪等注解 AOP 攔截器
*
* @公眾號:馬丁玩編程,回復:加群,添加馬哥微信(備注:12306)獲取項目資料
*/
@Aspect
public final class IdempotentAspect {
/**
* 增強方法標記 {@link Idempotent} 注解邏輯
*/
@Around("@annotation(org.opengoofy.index12306.framework.starter.idempotent.annotation.Idempotent)")
public Object idempotentHandler(ProceedingJoinPoint joinPoint) throws Throwable {
Idempotent idempotent = getIdempotent(joinPoint);
IdempotentExecuteHandler instance = IdempotentExecuteHandlerFactory.getInstance(idempotent.scene(), idempotent.type());
Object resultObj;
try {
instance.execute(joinPoint, idempotent);
resultObj = joinPoint.proceed();
instance.postProcessing();
} catch (RepeatConsumptionException ex) {
/**
* 觸發冪等邏輯時可能有兩種情況:
* * 1. 消息還在處理,但是不確定是否執行成功,那么需要返回錯誤,方便 RocketMQ 再次通過重試隊列投遞
* * 2. 消息處理成功了,該消息直接返回成功即可
*/
if (!ex.getError()) {
return null;
}
throw ex;
} catch (Throwable ex) {
// 客戶端消費存在異常,需要刪除冪等標識方便下次 RocketMQ 再次通過重試隊列投遞
instance.exceptionProcessing();
throw ex;
} finally {
IdempotentContext.clean();
}
return resultObj;
}
public static Idempotent getIdempotent(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
return targetMethod.getAnnotation(Idempotent.class);
}
}
這個方法的執行邏輯與設計部分相同,因此在此處不再貼出具體的代碼。大家可以跟著設計閱讀冪等源碼。
為了提高通用性和抽象性,該組件采用了模板方法和簡單工廠等設計模式,這有助于隔離復雜性和提高可擴展性。如果您在學習過程中遇到問題,歡迎在知識星球 APP 上向我提問。
3. 實際場景使用
以實現支付結果回調訂單為例,我們可以將通用組件引入到消息消費的邏輯中,具體流程如下:
java復制代碼@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = OrderRocketMQConstant.PAY_GLOBAL_TOPIC_KEY,
selectorExpression = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_TAG_KEY,
consumerGroup = OrderRocketMQConstant.PAY_RESULT_CALLBACK_ORDER_CG_KEY
)
public class PayResultCallbackOrderConsumer implements RocketMQListener<MessageWrapper<PayResultCallbackOrderEvent>> {
private final OrderService orderService;
@Idempotent(
uniqueKeyPrefix = "index12306-order:pay_result_callback:",
key = "#message.getKeys()+'_'+#message.hashCode()",
type = IdempotentTypeEnum.SPEL,
scene = IdempotentSceneEnum.MQ,
keyTimeout = 7200L
)
@Transactional(rollbackFor = Exception.class)
@Override
public void onMessage(MessageWrapper<PayResultCallbackOrderEvent> message) {
PayResultCallbackOrderEvent payResultCallbackOrderEvent = message.getMessage();
OrderStatusReversalDTO orderStatusReversalDTO = OrderStatusReversalDTO.builder()
.orderSn(payResultCallbackOrderEvent.getOrderSn())
.orderStatus(OrderStatusEnum.ALREADY_PAID.getStatus())
.build();
orderService.statusReversal(orderStatusReversalDTO);
orderService.payCallbackOrder(payResultCallbackOrderEvent);
}
}
支持通過 SpEL 表達式來充當冪等去重表唯一鍵,通過一個簡單的注解,完美解決消息隊列重復消費問題。
更復雜的冪等場景
到這里,方案看起來非常完美,所有的消息都可以快速接入去重,而且與具體業務實現完全解耦。但是,是否這樣就可以完美地完成去重的所有任務呢? 很遺憾,實際上并非如此。因為需要確保消息至少成功消費一次,因此消息在消費過程中有可能失敗并觸發重試。
還是以上面的例子,假設消息消費的流程包含:
- 檢查庫存(RPC)
- 鎖庫存(RPC)
- 開啟事務,插入訂單表(MySQL)
- 調用某些其他下游服務(RPC)
- 更新訂單狀態
- commit 事務(MySQL)
當消息消費到第三步的時候假設 MySQL 異常導致失敗了,觸發消息重試。在重試前我們會刪除冪等表的記錄,所以消息重試的時候就會重新進入消費代碼,那么步驟 1 和步驟 2 就會重新再執行一遍。
如果步驟 2 本身不是冪等的,那么這個業務消息消費依舊沒有做好完整的冪等處理。
1. 通用方法實現價值
盡管這種方式并不能完全解決消息冪等問題(事實上,軟件工程領域里很少有銀彈可以完全解決問題),但它仍然具有很大的價值。通過這種簡便的方式,我們能夠解決以下問題:
- 各種由于Broker、負載均衡等原因導致的消息重投遞的重復問題。
- 各種上游生產者導致的業務級別消息重復問題。
- 重復消息并發消費的控制窗口問題,就算重復,重復也不可能同一時間進入消費邏輯。
2. 消息去重的建議
使用這種方法可以確保在正常的消費邏輯場景下(無異常,無異常退出),消息的冪等性全部得到解決,無論是業務重復還是 RocketMQ 特性帶來的重復。雖然它不是解決消息冪等性的銀彈,但它以一種簡單和便捷的方式提供了解決方案。
實際上,這種方法已經可以解決 99% 的消息重復問題了,因為異常情況通常是少數情況。但是,如果希望在異常情況下也能處理好冪等問題,可以采取以下措施來降低問題發生的概率:
- 消息消費失敗時,應該及時回滾處理。如果消息消費失敗本身具備回滾機制,則消息重試也就沒有副作用了。
- 為了盡可能避免程序異常退出導致的消息重試,需要在消費者代碼中做好優雅退出處理。
- 針對一些無法做到完全冪等的操作,至少要做到終止消息的消費并進行告警。比如鎖定庫存的操作,如果通過業務流水號已經成功鎖定了庫存,再次觸發鎖庫存操作的話,如果無法做到冪等性處理,那么至少要在消息消費過程中觸發異常(如因主鍵沖突導致消費異常等),并終止消息的消費,以避免重復消費產生的副作用。
- 在 #3 做好的前提下,做好消息的消費監控,發現消息重試不斷失敗的時候,手動做好 #1 的回滾,使得下次重試消費成功。
文末總結
當我們在使用 RocketMQ 進行消息處理時,消息的冪等性是一個非常重要的問題。本文通過抽象出通用組件的方式,實現了 RestAPI 和 RocketMQ 的冪等處理。 同時,我們也發現,冪等性并不是一個銀彈,不同的業務場景需要不同的冪等處理策略。
但是,通過一些基本的處理策略,如優雅退出、回滾處理、消費監控等,我們能夠大大減少消息重復的問題,提高消息消費的穩定性和可靠性。 在實際開發中,需要結合具體業務場景,選擇合適的冪等處理策略,并且在每次新的場景出現時,都需要仔細考慮是否需要重新審視冪等性的處理方式。
上文中的代碼以及實現已在基礎架構模塊中定義,詳情查看。

原文鏈接:
https://juejin.cn/post/7270139990696722484