
整體閱讀時間,在 40 分鐘左右。
常見的消息隊列很多,主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ,相關的選型可以看我之前的系列,這篇文章只講 RabbitMQ,先講原理,后搞實戰。
文章很長,如果你能一次性看完,“大哥,請收下我的膝蓋”,建議大家先收藏,啥時需要面試,或者工作中遇到了,可以再慢慢看。
不 BB,直接上思維導圖:

1. 消息隊列
1.1 消息隊列模式
消息隊列目前主要 2 種模式,分別為“點對點模式”和“發布/訂閱模式”。
1.1.1 點對點模式
一個具體的消息只能由一個消費者消費,多個生產者可以向同一個消息隊列發送消息,但是一個消息在被一個消息者處理的時候,這個消息在隊列上會被鎖住或者被移除并且其他消費者無法處理該消息。
需要額外注意的是,如果消費者處理一個消息失敗了,消息系統一般會把這個消息放回隊列,這樣其他消費者可以繼續處理。

1.1.2 發布/訂閱模式
單個消息可以被多個訂閱者并發的獲取和處理。一般來說,訂閱有兩種類型:
- 臨時(ephemeral)訂閱:這種訂閱只有在消費者啟動并且運行的時候才存在。一旦消費者退出,相應的訂閱以及尚未處理的消息就會丟失。
- 持久(durable)訂閱:這種訂閱會一直存在,除非主動去刪除。消費者退出后,消息系統會繼續維護該訂閱,并且后續消息可以被繼續處理。

1.2 衡量標準
對消息隊列進行技術選型時,需要通過以下指標衡量你所選擇的消息隊列,是否可以滿足你的需求:
- 消息順序:發送到隊列的消息,消費時是否可以保證消費的順序,比如A先下單,B后下單,應該是A先去扣庫存,B再去扣,順序不能反。
- 消息路由:根據路由規則,只訂閱匹配路由規則的消息,比如有A/B兩者規則的消息,消費者可以只訂閱A消息,B消息不會消費。
- 消息可靠性:是否會存在丟消息的情況,比如有A/B兩個消息,最后只有B消息能消費,A消息丟失。
- 消息時序:主要包括“消息存活時間”和“延遲/預定的消息”,“消息存活時間”表示生產者可以對消息設置TTL,如果超過該TTL,消息會自動消失;“延遲/預定的消息”指的是可以延遲或者預訂消費消息,比如延時5分鐘,那么消息會5分鐘后才能讓消費者消費,時間未到的話,是不能消費的。
- 消息留存:消息消費成功后,是否還會繼續保留在消息隊列。
- 容錯性:當一條消息消費失敗后,是否有一些機制,保證這條消息是一種能成功,比如異步第三方退款消息,需要保證這條消息消費掉,才能確定給用戶退款成功,所以必須保證這條消息消費成功的準確性。
- 伸縮:當消息隊列性能有問題,比如消費太慢,是否可以快速支持庫容;當消費隊列過多,浪費系統資源,是否可以支持縮容。
- 吞吐量:支持的最高并發數。
2. RabbitMQ 原理初探
RabbitMQ 2007 年發布,是使用 Erlang 語言開發的開源消息隊列系統,基于 AMQP 協議來實現。
2.1 基本概念
提到RabbitMQ,就不得不提AMQP協議。AMQP協議是具有現代特征的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。
先了解一下AMQP協議中間的幾個重要概念:
- Server:接收客戶端的連接,實現AMQP實體服務。
- Connection:連接,應用程序與Server的網絡連接,TCP連接。
- Channel:信道,消息讀寫等操作在信道中進行。客戶端可以建立多個信道,每個信道代表一個會話任務。
- Message:消息,應用程序和服務器之間傳送的數據,消息可以非常簡單,也可以很復雜。由Properties和Body組成。Properties為外包裝,可以對消息進行修飾,比如消息的優先級、延遲等高級特性;Body就是消息體內容。
- Virtual Host:虛擬主機,用于邏輯隔離。一個虛擬主機里面可以有若干個Exchange和Queue,同一個虛擬主機里面不能有相同名稱的Exchange或Queue。
- Exchange:交換器,接收消息,按照路由規則將消息路由到一個或者多個隊列。如果路由不到,或者返回給生產者,或者直接丟棄。RabbitMQ常用的交換器常用類型有direct、topic、fanout、headers四種,后面詳細介紹。
- Binding:綁定,交換器和消息隊列之間的虛擬連接,綁定中可以包含一個或者多個RoutingKey。
- RoutingKey:路由鍵,生產者將消息發送給交換器的時候,會發送一個RoutingKey,用來指定路由規則,這樣交換器就知道把消息發送到哪個隊列。路由鍵通常為一個“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息隊列,用來保存消息,供消費者消費。
2.2 工作原理
AMQP 協議模型由三部分組成:生產者、消費者和服務端,執行流程如下:
- 生產者是連接到 Server,建立一個連接,開啟一個信道。
- 生產者聲明交換器和隊列,設置相關屬性,并通過路由鍵將交換器和隊列進行綁定。
- 消費者也需要進行建立連接,開啟信道等操作,便于接收消息。
- 生產者發送消息,發送到服務端中的虛擬主機。
- 虛擬主機中的交換器根據路由鍵選擇路由規則,發送到不同的消息隊列中。
- 訂閱了消息隊列的消費者就可以獲取到消息,進行消費。

2.3 常用交換器
RabbitMQ常用的交換器類型有direct、topic、fanout、headers四種:
- Direct Exchange:見文知意,直連交換機意思是此交換機需要綁定一個隊列,要求該消息與一個特定的路由鍵完全匹配。簡單點說就是一對一的,點對點的發送。

- Fanout Exchange:這種類型的交換機需要將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。簡單點說就是發布訂閱。

- Topic Exchange:直接翻譯的話叫做主題交換機,如果從用法上面翻譯可能叫通配符交換機會更加貼切。這種交換機是使用通配符去匹配,路由到對應的隊列。通配符有兩種:"*" 、 "#"。需要注意的是通配符前面必須要加上"."符號。
- *符號:有且只匹配一個詞。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。
- #符號:匹配一個或多個詞。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。

- Headers Exchange:這種交換機用的相對沒這么多。它跟上面三種有點區別,它的路由不是用routingKey進行路由匹配,而是在匹配請求頭中所帶的鍵值進行路由。創建隊列需要設置綁定的頭部信息,有兩種模式:全部匹配和部分匹配。如上圖所示,交換機會根據生產者發送過來的頭部信息攜帶的鍵值去匹配隊列綁定的鍵值,路由到對應的隊列。

2.4 消費原理
我們先看幾個基本概念:
- broker:每個節點運行的服務程序,功能為維護該節點的隊列的增刪以及轉發隊列操作請求。
- master queue:每個隊列都分為一個主隊列和若干個鏡像隊列。
- mirror queue:鏡像隊列,作為master queue的備份。在master queue所在節點掛掉之后,系統把mirror queue提升為master queue,負責處理客戶端隊列操作請求。注意,mirror queue只做鏡像,設計目的不是為了承擔客戶端讀寫壓力。
集群中有兩個節點,每個節點上有一個broker,每個broker負責本機上隊列的維護,并且borker之間可以互相通信。集群中有兩個隊列A和B,每個隊列都分為master queue和mirror queue(備份)。那么隊列上的生產消費怎么實現的呢?

對于消費隊列,如下圖有兩個consumer消費隊列A,這兩個consumer連在了集群的不同機器上。RabbitMQ集群中的任何一個節點都擁有集群上所有隊列的元信息,所以連接到集群中的任何一個節點都可以,主要區別在于有的consumer連在master queue所在節點,有的連在非master queue節點上。
因為mirror queue要和master queue保持一致,故需要同步機制,正因為一致性的限制,導致所有的讀寫操作都必須都操作在master queue上(想想,為啥讀也要從master queue中讀?和數據庫讀寫分離是不一樣的),然后由master節點同步操作到mirror queue所在的節點。即使consumer連接到了非master queue節點,該consumer的操作也會被路由到master queue所在的節點上,這樣才能進行消費。

對于生成隊列,原理和消費一樣,如果連接到非 master queue 節點,則路由過去。

所以,到這里小伙伴們就可以看到 RabbitMQ的不足:由于master queue單節點,導致性能瓶頸,吞吐量受限。雖然為了提高性能,內部使用了Erlang這個語言實現,但是終究擺脫不了架構設計上的致命缺陷。
2.5 高級特性
2.5.1 過期時間
Time To Live,也就是生存時間,是一條消息在隊列中的最大存活時間,單位是毫秒,下面看看RabbitMQ過期時間特性:
- RabbitMQ可以對消息和隊列設置TTL。
- RabbitMQ支持設置消息的過期時間,在消息發送的時候可以進行指定,每條消息的過期時間可以不同。
- RabbitMQ支持設置隊列的過期時間,從消息入隊列開始計算,直到超過了隊列的超時時間配置,那么消息會變成死信,自動清除。
- 如果兩種方式一起使用,則過期時間以兩者中較小的那個數值為準。
- 當然也可以不設置TTL,不設置表示消息不會過期;如果設置為0,則表示除非此時可以直接將消息投遞到消費者,否則該消息將被立即丟棄。
2.5.2 消息確認
為了保證消息從隊列可靠地到達消費者,RabbitMQ提供了消息確認機制。
消費者訂閱隊列的時候,可以指定autoAck參數,當autoAck為true的時候,RabbitMQ采用自動確認模式,RabbitMQ自動把發送出去的消息設置為確認,然后從內存或者硬盤中刪除,而不管消費者是否真正消費到了這些消息。
當autoAck為false的時候,RabbitMQ會等待消費者回復的確認信號,收到確認信號之后才從內存或者磁盤中刪除消息。
消息確認機制是RabbitMQ消息可靠性投遞的基礎,只要設置autoAck參數為false,消費者就有足夠的時間處理消息,不用擔心處理消息的過程中消費者進程掛掉后消息丟失的問題。
2.5.3 持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息可靠性的呢?答案就是消息持久化。持久化可以防止在異常情況下丟失數據。RabbitMQ的持久化分為三個部分:交換器持久化、隊列持久化和消息的持久化。
交換器持久化可以通過在聲明隊列時將durable參數設置為true。如果交換器不設置持久化,那么在RabbitMQ服務重啟之后,相關的交換器元數據會丟失,不過消息不會丟失,只是不能將消息發送到這個交換器了。
隊列的持久化能保證其本身的元數據不會因異常情況而丟失,但是不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,需要將其設置為持久化。隊列的持久化可以通過在聲明隊列時將durable參數設置為true。
設置了隊列和消息的持久化,當RabbitMQ服務重啟之后,消息依然存在。如果只設置隊列持久化或者消息持久化,重啟之后消息都會消失。
當然,也可以將所有的消息都設置為持久化,但是這樣做會影響RabbitMQ的性能,因為磁盤的寫入速度比內存的寫入要慢得多。
對于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量。魚和熊掌不可兼得,關鍵在于選擇和取舍。在實際中,需要根據實際情況在可靠性和吞吐量之間做一個權衡。
2.5.4 死信隊列
當消息在一個隊列中變成死信之后,他能被重新發送到另一個交換器中,這個交換器成為死信交換器,與該交換器綁定的隊列稱為死信隊列。
消息變成死信有下面幾種情況:
- 消息被拒絕。
- 消息過期
- 隊列達到最大長度
DLX也是一個正常的交換器,和一般的交換器沒有區別,他能在任何的隊列上面被指定,實際上就是設置某個隊列的屬性。當這個隊列中有死信的時候,RabbitMQ會自動將這個消息重新發送到設置的交換器上,進而被路由到另一個隊列,我們可以監聽這個隊列中消息做相應的處理。
死信隊列有什么用?當發生異常的時候,消息不能夠被消費者正常消費,被加入到了死信隊列中。后續的程序可以根據死信隊列中的內容分析當時發生的異常,進而改善和優化系統。
2.5.5 延遲隊列
一般的隊列,消息一旦進入隊列就會被消費者立即消費。延遲隊列就是進入該隊列的消息會被消費者延遲消費,延遲隊列中存儲的對象是的延遲消息,“延遲消息”是指當消息被發送以后,等待特定的時間后,消費者才能拿到這個消息進行消費。
延遲隊列用于需要延遲工作的場景。最常見的使用場景:淘寶或者天貓我們都使用過,用戶在下單之后通常有30分鐘的時間進行支付,如果這30分鐘之內沒有支付成功,那么訂單就會自動取消。
除了延遲消費,延遲隊列的典型應用場景還有延遲重試。比如消費者從隊列里面消費消息失敗了,可以延遲一段時間以后進行重試。
2.6 特性分析
這里才是內容的重點,不僅需要知道Rabbit的特性,還需要知道支持這些特性的原因:
- 消息路由(支持):RabbitMQ可以通過不同的交換器支持不同種類的消息路由;
- 消息有序(不支持):當消費消息時,如果消費失敗,消息會被放回隊列,然后重新消費,這樣會導致消息無序;
- 消息時序(非常好):通過延時隊列,可以指定消息的延時時間,過期時間TTL等;
- 容錯處理(非常好):通過交付重試和死信交換器(DLX)來處理消息處理故障;
- 伸縮(一般):伸縮其實沒有非常智能,因為即使伸縮了,master queue還是只有一個,負載還是只有這一個master queue去抗,所以我理解RabbitMQ的伸縮很弱(個人理解)。
- 持久化(不太好):沒有消費的消息,可以支持持久化,這個是為了保證機器宕機時消息可以恢復,但是消費過的消息,就會被馬上刪除,因為RabbitMQ設計時,就不是為了去存儲歷史數據的。
- 消息回溯(不支持):因為消息不支持永久保存,所以自然就不支持回溯。
- 高吞吐(中等):因為所有的請求的執行,最后都是在master queue,它的這個設計,導致單機性能達不到十萬級的標準。
3. RabbitMQ環境搭建
因為我用的是mac,所以直接可以參考官網:
https://www.rabbitmq.com/install-homebrew.html
需要注意的是,一定需要先執行:
brew update
然后再執行:
brew install rabbitmq
之前沒有執行brew update,直接執行brew install rabbitmq時,會報各種各樣奇怪的錯誤,其中“403 Forbidde”居多。
但是在執行“brew install rabbitmq”,會自動安裝其它的程序,如果你使用源碼安裝Rabbitmq,因為啟動該服務依賴erlang環境,所以你還需手動安裝erlang,但是目前官方已經一鍵給你搞定,會自動安裝Rabbitmq依賴的所有程序,是不是很棒!

最后執行成功的輸出如下:

啟動服務:
# 啟動方式1:后臺啟動
brew services start rabbitmq
# 啟動方式2:當前窗口啟動
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server
在瀏覽器輸入:
http://localhost:15672/
會出現RabbitMQ后臺管理界面(用戶名和密碼都為guest):

通過brew安裝,一行命令搞定,真香!
4. RabbitMQ測試
4.1 添加賬號
首先得啟動mq
## 添加賬號
./rabbitmqctl add_user admin admin
## 添加訪問權限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設置超級權限
./rabbitmqctl set_user_tags admin administrator
4.2 編碼實測
因為代碼中引入了JAVA 8的特性,pom引入依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
<plugins>
<plugin>
<groupId>org.Apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
開始寫代碼:
public class RabbitMqTest {
//消息隊列名稱
private final static String QUEUE_NAME = "hello";
@Test
public void send() throws java.io.IOException, TimeoutException {
//創建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//創建連接
Connection connection = factory.newConnection();
//創建消息通道
Channel channel = connection.createChannel();
//生成一個消息隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message = "Hello World RabbitMQ count: " + i;
//發布消息,第一個參數表示路由(Exchange名稱),為""則表示使用默認消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
//關閉消息通道和連接
channel.close();
connection.close();
}
@Test
public void consumer() throws java.io.IOException, TimeoutException {
//創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//創建連接
Connection connection = factory.newConnection();
//創建消息信道
final Channel channel = connection.createChannel();
//消息隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
執行send()后控制臺輸出:
[x] Sent 'Hello World RabbitMQ count: 0'
[x] Sent 'Hello World RabbitMQ count: 1'
[x] Sent 'Hello World RabbitMQ count: 2'
[x] Sent 'Hello World RabbitMQ count: 3'
[x] Sent 'Hello World RabbitMQ count: 4'
[x] Sent 'Hello World RabbitMQ count: 5'
[x] Sent 'Hello World RabbitMQ count: 6'
[x] Sent 'Hello World RabbitMQ count: 7'
[x] Sent 'Hello World RabbitMQ count: 8'
[x] Sent 'Hello World RabbitMQ count: 9'

執行consumer()后:

示例中的代碼講解,可以直接參考官網:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
5. 基本使用姿勢
5.1 公共代碼封裝
封裝工廠類:
public class RabbitUtil {
public static ConnectionFactory getConnectionFactory() {
//創建連接工程,下面給出的是默認的case
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}
}
封裝生成者:
public class MsgProducer {
public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創建連接
Connection connection = factory.newConnection();
//創建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息為可持久化,不自動刪除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 發布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
}
封裝消費者:
public class MsgConsumer {
public static void consumerMsg(String exchange, String queue, String routingKey)
throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創建連接
Connection connection = factory.newConnection();
//創建消息信道
final Channel channel = connection.createChannel();
//消息隊列
channel.queueDeclare(queue, true, false, false, null);
//綁定隊列到交換機
channel.queueBind(queue, exchange, routingKey);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
}
}
5.2 Direct方式

5.2.1 Direct示例
生產者:
public class DirectProducer {
private static final String EXCHANGE_NAME = "direct.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectProducer directProducer = new DirectProducer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg(routingKey[i % 3], msg + i);
}
System.out.println("----over-------");
Thread.sleep(1000 * 60 * 100);
}
}
執行生產者,往消息隊列中放入10條消息,其中key分別為“aaa”、“bbb”和“ccc”,分別放入qa、qb、qc三個隊列:

下面是qa隊列的信息:

消費者:
public class DirectConsumer {
private static final String exchangeName = "direct.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
}
執行后的輸出:
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 0
[x] Done
[x] Received 'hello >>> 3
[x] Done
[x] Received 'hello >>> 6
[x] Done
[x] Received 'hello >>> 9
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 1
[x] Done
[x] Received 'hello >>> 4
[x] Done
[x] Received 'hello >>> 7
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 2
[x] Done
[x] Received 'hello >>> 5
[x] Done
[x] Received 'hello >>> 8
[x] Done
可以看到,分別從qa、qb、qc中將不同的key的數據消費掉。
5.2.2 問題探討
有個疑問:這個隊列的名稱qa、qb和qc是RabbitMQ自動生成的么,我們可以指定隊列名稱么?
我做了個簡單的實驗,我把消費者代碼修改了一下:
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc1"}; // 將qc修改為qc1
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
執行后如下圖所示:

我們可以發現,多了一個qc1,所以可以判斷這個界面中的queues,是消費者執行時,會將消費者指定的隊列名稱和direct.exchange綁定,綁定的依據就是key。
當我們把隊列中的數據全部消費掉,然后重新執行生成者后,會發現qc和qc1中都有3條待消費的數據,因為綁定的key都是“ccc”,所以兩者的數據是一樣的:

綁定關系如下:

注意:當沒有Queue綁定到Exchange時,往Exchange中寫入的消息也不會重新分發到之后綁定的queue上。
思考:不執行消費者,看不到這個Queres中信息,我其實可以把這個界面理解為消費者信息界面。不過感覺還是怪怪的,這個queues如果是消費者信息,就不應該叫queues,我理解queues應該是RabbitMQ中實際存放數據的queues,難道是我理解錯了?
5.3 Fanout方式(指定隊列)

生產者封裝:
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg("", msg + i);
}
}
}
消費者:
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
String[] queueNames = new String[]{"qa-2", "qb-2", "qc-2"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], "");
}
}
}
執行生成者,結果如下:

我們發現,生產者生產的10條數據,在每個消費者中都可以消費,這個是和Direct不同的地方,但是使用Fanout方式時,有幾個點需要注意一下:
- 生產者的routkey可以為空,因為生產者的所有數據,會下放到每一個隊列,所以不會通過routkey去路由;
- 消費者需要指定queues,因為消費者需要綁定到指定的queues才能消費。

這幅圖就畫出了Fanout的精髓之處,exchange會和所有的queue進行綁定,不區分路由,消費者需要綁定指定的queue才能發起消費。
注意:往隊列塞數據時,可能通過界面看不到消息個數的增加,可能是你之前已經開啟了消費進程,導致增加的消息馬上被消費了。
5.4 Fanout方式(隨機獲取隊列)
上面我們是指定了隊列,這個方式其實很不友好,比如對于Fanout,我其實根本無需關心隊列的名字,如果還指定對應隊列進行消費,感覺這個很冗余,所以我們這里就采用隨機獲取隊列名字的方式,下面代碼直接Copy官網。
生成者封裝:
public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創建連接
Connection connection = factory.newConnection();
//創建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息
channel.exchangeDeclare(exchange, exchangeType);
// 發布消息
channel.basicPublish(exchange, "", null, message.getBytes("UTF-8"));
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
消費者封裝:
public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
生產者:
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void publishMsg(String msg) {
try {
MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10000; i++) {
directProducer.publishMsg(msg + i);
}
}
}
消費者:
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void msgConsumer() {
try {
MsgConsumer.consumerMsgV2(EXCHANGE_NAME);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
for (int i = 0; i < 3; i++) {
consumer.msgConsumer();
}
}
}
執行后,管理界面如下:



5.5 Topic方式

代碼詳見官網:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
更多方式,請直接查看官網:https://www.rabbitmq.com/getstarted.html

6. RabbitMQ 進階
6.1 durable 和 autoDeleted
在定義Queue時,可以指定這兩個參數:
/**
* Declare an exchange.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
6.1.1 durable
持久化,保證RabbitMQ在退出或者crash等異常情況下數據沒有丟失,需要將queue,exchange和Message都持久化。
若是將queue的持久化標識durable設置為true,則代表是一個持久的隊列,那么在服務重啟之后,會重新讀取之前被持久化的queue。
雖然隊列可以被持久化,但是里面的消息是否為持久化,還要看消息的持久化設置。即重啟queue,但是queue里面還沒有發出去的消息,那隊列里面還存在該消息么?這個取決于該消息的設置。
6.1.2 autoDeleted
自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用于臨時隊列。
當一個Queue被設置為自動刪除時,當消費者斷掉之后,queue會被刪除,這個主要針對的是一些不是特別重要的數據,不希望出現消息積累的情況。
6.1.3 小節
- 當一個Queue已經聲明好了之后,不能更新durable或者autoDelted值;當需要修改時,需要先刪除再重新聲明
- 消費的Queue聲明應該和投遞的Queue聲明的 durable,autoDelted屬性一致,否則會報錯
- 對于重要的數據,一般設置 durable=true, autoDeleted=false
- 對于設置 autoDeleted=true 的隊列,當沒有消費者之后,隊列會自動被刪除
6.4 ACK
執行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執行任務過程中掛掉了。一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種情況下,如果正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但尚未處理的消息。
但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那么我們應該將分發給它的任務交付給另一個消費者去處理。
為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收并且處理完畢了。RabbitMQ就可以刪除它了。
因此手動ACK的常見手段:
// 接收消息之后,主動ack/nak
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [ " + queue + " ] Received '" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);






