大家都知道,RocketMQ 消費模式有 PULL 模式和 PUSH 模式,不過本質上都是 PULL 模式,而在實際使用時,一般使用 PUSH 模式。
不過,RocketMQ 的 PUSH 模式有明顯的不足,主要體現在以下幾個方面:
- 消息積壓了,增加消費者不一定能解決。PUSH 模式如下圖:

上面的圖中,消費組中的消費者每個消費者消費兩個 MessageQueue,這種情況下,增加消費者是可以提高消費能力的。
但是下面這張圖,每個消費者消費一個 MessageQueue,因為同一個 MessageQueue 只能被同一個消費組中的一個消費者消費,所以增加消費者并不能提高消費能力。

- 客戶端的處理邏輯比較多,比如負載均衡、offset 管理、消費失敗后的處理(比如失敗消息發送回 Broker),這些邏輯都在客戶端。
- 如果再支持其他語言,客戶端會變得越來越重。
- 消費者機器 hang 住,可能會導致消息積壓,如下圖:

通過客戶端負責均衡,MessageQueue0 這個隊列分配給了 Consumer0 進行獨占消費,如果 Consumer0 這個消費者 hang 住了,但是服務沒有掛,不能從 Name Server 中下線,因為 Consumer0 拉取到的消息不能消費,也就不能給 Broker 發送更新 Offset 的請求,最終導致消息積壓。這種情況只能手動讓 Consumer0 下線或者讓 Consumer0 重啟。
RocketMQ 5.0 為了解決 PUSH Consumer 上面的問題,引入了 POP Consumer。
1 POP 客戶端
POP 模式的客戶端引入的背景是 RocketMQ 5.0 為了更好地擁抱云原生,客戶端要改造成無狀態的輕量級客戶端,RocketMQ 4.x 中客戶端具有的負載均衡、權限管理、消費管理等功能都從客戶端移動到了 Proxy。
POP 消費模式如下圖:

四個消費者都可以消費 Broker1 和 Broker2 上面的所有隊列,這樣即使某一個消費者 hang 住了,其他消費者也可以消費,并不會造成消息積壓。
同時,從上圖中可以看到,POP 客戶端還有一個優勢,增加消費者數量是可以提高消費能力的,不受 MessageQueue 數量和消費者數量的限制。
跟 PUSH 模式相比,POP 模式拉取到消息后,會設置一個 POP_CK 屬性,代碼如下:
//MQClientAPIImpl.JAVA
if (requestHeader instanceof PopMessageRequestHeader) {
if (startOffsetInfo == null) {
// we should set the check point info to extraInfo field , if the command is popMsg
// find pop ck offset
String key = messageExt.getTopic() + messageExt.getQueueId();
if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
messageExt.getTopic(), brokerName, messageExt.getQueueId()));
}
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
} else {
String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), messageExt.getQueueId(), messageExt.getQueueOffset());
int index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
Long msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
);
//...
}
}
可以看到,POP_CK 屬性包含了 brokerName、Topic、QueueId、offset 等參數,通過這個屬性可以唯一標識一條消息了。
從上面的代碼還可以看到,responseHeader 中有一個 invisibleTime 屬性,這個屬性的作用是消費者通過 POP 模式拉取到一條消息后,這段時間(invisibleTime)內這條消息在 Broker 端是不可見的,消費者再次拉取就不會重復拉取到。但是如果過了這段時間,消費者還沒有給 Broker 返回 ACK,這條消息會變為可見,再次被消費者拉取到。
消費完成后,向 Broker 發送 ACK 消息,見下面代碼:
public void ackMessageAsync(
final String addr,
final long timeOut,
final AckCallback ackCallback,
final AckMessageRequestHeader requestHeader //
) throws RemotingException, MQBrokerException, InterruptedException {
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {
@Override
public void onComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
AckResult ackResult = new AckResult();
if (ResponseCode.SUCCESS == response.getCode()) {
ackResult.setStatus(AckStatus.OK);
} //...
assert ackResult != null;
ackCallback.onSuccess(ackResult);
} //...
} else {
//...
}
}
});
}
2. Broker
從上面的介紹可以看到,每個消費者都可以從 Broker 的所有 MessageQueue 上拉取消息,那如果多個消費者都從一個 MessageQueue 上面拉取,有沒有可能會重復消費呢?
Broker 收到消息拉取請求,從 MessageStore 拉取消息時,首先會給 MessageQueue 進行加鎖,加鎖成功后,才會拉取消息,這是其他客戶端來拉取時就會加鎖失敗。
//PopMessageProcessor.java
String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
long offset = getPopOffset(topic, requestHeader, queueId, false, lockKey);
if (!queueLockManager.tryLock(lockKey)) {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
return restNum;
}
Broker 從 MessageStore 拉取到消息后,會定義一個 CheckPoint 放入緩存,代碼如下:
//PopMessageProcessor.java
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
Channel channel, long popTime,
ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()) : requestHeader.getTopic();
String lockKey =
topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
//...
offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
GetMessageResult getMessageTmpResult = null;
try {
//...
restNum = getMessageTmpResult.getMaxOffset() - getMessageTmpResult.getNextBeginOffset() + restNum;
if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {
if (isOrder) {
//...
} else {
AppendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());
}
} //...
} //...
return restNum;
}
Broker 收到消費者發來的 ACK 后,會把 CheckPoint 從緩存中移除。
如果 Broker 一直沒有收到 ACK,則會把 CheckPoint 從緩存中移除,同時把 CheckPoint 發送給 MessageStore,由 MessageStore 發送到重試隊列。代碼如下:
boolean removeCk = !this.serving;
// ck will be timeout
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
removeCk = true;
}
// the time stayed is too long
if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
removeCk = true;
}
// double check
if (removeCk) {
// put buffer ak to store
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, false);
}
}
}
3 總結
POP 客戶端有很多的優勢,總結如下:
- 無狀態,更好地擁抱云原生;
- 計算相關的功能下移到 Proxy,更加輕量級;
- 消費能力擴展不受 MessageQueue 數量的限制;
- 消費者 hang 住,并不會導致消息積壓。






