一、關(guān)于 RabbitMQ
說到 RabbitMQ,相信大家都不會陌生,微服務(wù)開發(fā)中必不可少的中間件。
在上篇關(guān)于消息隊列的文章中,我們了解到 RabbitMQ 本質(zhì)其實是用 Erlang 開發(fā)的 AMQP(Advanced Message Queuing Protocol )的具體實現(xiàn),最初起源于金融系統(tǒng),主要用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面有著不俗的表現(xiàn)。
2010年4月,RabbitMQ 科技公司被 VMware 旗下的 SpringSource 收購,在 2013 年 5 月被并入 Pivotal 。
其實 VMware,Pivotal 本質(zhì)上是一家的。不同的是,VMware 是獨立上市子公司,而 Pivotal 是整合了EMC的某些資源,現(xiàn)在并沒有上市。其中我們現(xiàn)在使用的 Spring 系列框架,就是 Pivotal 公司熱門的產(chǎn)品之一。
直到后來 Pivotal 將其開源,RabbitMQ 才逐漸走向大眾!
RabbitMQ 發(fā)展到今天,已經(jīng)被越來越多的人認可,尤其是互聯(lián)網(wǎng)公司,已經(jīng)有著大規(guī)模的場景應(yīng)用,今天我們就一起來深入了解一下 RabbitMQ。
二、RabbitMQ 模型介紹
2.1、內(nèi)部結(jié)構(gòu)分析
上面我們有說到 RabbitMQ 本質(zhì)是 AMQP 協(xié)議的一個開源實現(xiàn),在詳細介紹 RabbitMQ 之前,我們先來看一下 AMQP 的內(nèi)部結(jié)構(gòu)圖!
基本概念如下:
- Publisher:消息的生產(chǎn)者,也是一個向交換器發(fā)布消息的客戶端應(yīng)用程序
- Exchange:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列
- Binding:綁定,用于將消息隊列和交換器之間建立關(guān)聯(lián)。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規(guī)則,所以可以將它理解成一個由綁定構(gòu)成的路由表。
- Queue:消息隊列,用來保存消息直到發(fā)送給消費者
- Connection:網(wǎng)絡(luò)連接,比如一個 TCP 連接
- Channel:信道,多路復(fù)用連接中的一條獨立的雙向數(shù)據(jù)流通道
- Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應(yīng)用程序
- Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關(guān)對象。虛擬主機是共享相同的身份認證和加密環(huán)境的獨立服務(wù)器域。每個 vhost 本質(zhì)上就是一個 mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊列、交換器、綁定和權(quán)限機制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時指定,RabbitMQ 默認的 vhost 是 /
- Broker:表示消息隊列服務(wù)器實體
- Message:消息實體,它由消息頭和消息體組成。消息頭主要由路由鍵、交換器、隊列、priority(相對于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲)等屬性組成,而消息體就是指具體的業(yè)務(wù)對象
相比傳統(tǒng)的 JMS 模型,AMQP 主要多了 Exchange、Binding 這個新概念。
在 AMQP 模型中,消息的生產(chǎn)者不是直接將消息發(fā)送到Queue隊列,而是將消息發(fā)送到Exchange交換器,其中還新加了一個中間層Binding綁定,作用就是通過路由鍵Key將交換器和隊列建立綁定關(guān)系。
就好比類似用戶表和角色表,中間通過用戶角色表來將用戶和角色建立關(guān)系,從而實現(xiàn)關(guān)系綁定,在 RabbitMQ 中,消息生產(chǎn)者不直接跟隊列建立關(guān)系,而是將消息發(fā)送到交換器之后,由交換器通過已經(jīng)建立好的綁定關(guān)系,將消息發(fā)送到對應(yīng)的隊列!
RabbitMQ 最終的架構(gòu)模型,核心部分就變成如下圖所示:
從圖中很容易看出,與 JMS 模型最明顯的差別就是消息的生產(chǎn)者不直接將消息發(fā)送給隊列,而是由Binding綁定決定交換器的消息應(yīng)該發(fā)送到哪個隊列,進一步實現(xiàn)了在消息的推送方面,更加靈活!
2.2、交換器分發(fā)策略
當消息的生產(chǎn)者將消息發(fā)送到交換器之后,是不會存儲消息的,而是通過中間層綁定關(guān)系將消息分發(fā)到不同的隊列上,其中交換器的分發(fā)策略分為四種:Direct、Topic、Headers、Fanout!
- Direct:直連類型,即在綁定時設(shè)定一個 routing_key, 消息的 routing_key 匹配時, 才會被交換器投送到綁定的隊列中去,原則是先匹配、后投送;
- Topic:按照規(guī)則轉(zhuǎn)發(fā)類型,支持通配符匹配,和 Direct 功能一樣,但是在匹配 routing_key的時候,更加靈活,支持通配符匹配,原則也是先匹配、后投送;
- Headers:頭部信息匹配轉(zhuǎn)發(fā)類型,根據(jù)消息頭部中的 header attribute 參數(shù)類型,將消息轉(zhuǎn)發(fā)到對應(yīng)的隊列,原則也是先匹配、后投送;
- Fanout:廣播類型,將消息轉(zhuǎn)發(fā)到所有與該交互機綁定的隊列上,不關(guān)心 routing_key;
2.2.1、Direct
Direct 是 RabbitMQ 默認的交換機模式,也是最簡單的模式,消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發(fā)到對應(yīng)的隊列中。
如果傳入的 routing key 為 black,不會轉(zhuǎn)發(fā)到black.green。Direct 類型交換器是完全匹配、單播的模式。
2.2.2、Topic
Topic 類型交換器轉(zhuǎn)發(fā)消息和 Direct 一樣,不同的是:它支持通配符轉(zhuǎn)發(fā),相比 Direct 類型更加靈活!
兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個。
如果傳入的 routing key 為 black#,不僅會轉(zhuǎn)發(fā)到black,也會轉(zhuǎn)發(fā)到black.green。
2.2.3、Headers
headers 也是根據(jù)規(guī)則匹配, 相比 direct 和 topic 固定地使用 routing_key , headers 則是通過一個自定義匹配規(guī)則的消息頭部類進行匹配。
在隊列與交換器綁定時,會設(shè)定一組鍵值對規(guī)則,消息中也包括一組鍵值對( headers 屬性),當這些鍵值對有一對, 或全部匹配時,消息被投送到對應(yīng)隊列。
此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了。
2.2.4、Fanout
Fanout 類型交換器與上面幾個不同,不管路由鍵或者是路由模式,會把消息發(fā)給綁定給它的全部隊列,如果配置了 routing_key 會被忽略,也被成為消息廣播模式。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機都獲得了一份復(fù)制的消息
fanout 類型轉(zhuǎn)發(fā)消息在四種類型中是最快的。
三、RabbitMQ 安裝
RabbitMQ 基于 erlang 進行通信,相比其它的軟件,安裝有些麻煩,為了跟生產(chǎn)環(huán)境保持一直,操作系統(tǒng)選擇centos7,不過本例采用rpm方式安裝,任何新手都可以完成安裝,過程如下!
3.1、安裝前命令準備
輸入如下命令,完成安裝前的環(huán)境準備。
yum install lsof build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim
3.2、下載 RabbitMQ、erlang、socat 的安裝包
本次下載的是RabbitMQ-3.6.5版本,采用rpm一鍵安裝,適合新手直接上手。
先創(chuàng)建一個rabbitmq目錄,本例的目錄路徑為/usr/App/rabbitmq,然后在目錄下執(zhí)行如下命令,下載安裝包!
- 下載erlang
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
- 下載socat
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
- 下載rabbitMQ
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
最終目錄文件如下:
3.3、安裝軟件包
下載完之后,按順序依次安裝軟件包,這個很重要哦~
- 安裝erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
- 安裝socat
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
- 安裝rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
安裝完成之后,修改rabbitmq的配置,默認配置文件在
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目錄下。
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
修改loopback_users節(jié)點的值!
最后只需通過如下命令,啟動服務(wù)即可!
rabbitmq-server start &
運行腳本之后,如果報錯,例如下圖!
解決辦法如下:
vim /etc/rabbitmq/rabbitmq-env.conf
在文件里添加一行,如下配置!
NODENAME=rabbit@localhost
然后,再保存!再次以下命令啟動服務(wù)!
rabbitmq-server start &
通過如下命令,查詢服務(wù)是否啟動成功!
lsof -i:5672
如果出現(xiàn)5672已經(jīng)被監(jiān)聽,說明已經(jīng)啟動成功!
3.4、啟動可視化的管控臺
輸入如下命令,啟動控制臺!
rabbitmq-plugins enable rabbitmq_management
用瀏覽器打開http://ip:15672,這里的ip就是 CentOS 系統(tǒng)的 ip,結(jié)果如下:
賬號、密碼,默認為guest,如果出現(xiàn)無法訪問,檢測防火墻是否開啟,如果開啟將其關(guān)閉即可!
登錄之后的監(jiān)控平臺,界面如下:
四、web界面使用
相比其他的消息隊列,rabbitMQ 其中一個很明顯的好處就是有 web 操作界面,而且簡單易用。
進入 web 管理界面之后,可以很清晰的看到分了 6 個菜單目錄,分別是:Overview、Connections、Channels、Exchanges、Queues、Admin。
- Overview:總覽概述,主要介紹 rabbitmq 一些基礎(chǔ)匯總等信息
- Connections:連接池管理,主要介紹客戶端連接等信息
- Channels:信道管理,主要介紹信道連接等信息
點擊具體某個具體的信道,可以看到對應(yīng)的消費隊列等信息。
- Exchanges:交換器管理,主要介紹交換器等信息
- Queues:隊列管理,主要介紹隊列等信息
- Admin:系統(tǒng)管理,主要介紹用戶、虛擬主機、權(quán)限等信息
下面,我們重點介紹一些如何通過 web 頁面來操作 rabbitMQ!
4.1、交換器管理
點擊進入 Exchanges 菜單,最下面有一個Add a new exchange標簽。
點擊Add a new exchange,會展示如下信息!
- Name:交換器名稱
- Type:交換器類型
- Durability:是否持久化,Durable:持久化,Transient:不持久化
- Auto delete:是否自動刪除,當最后一個綁定(隊列或者exchange)被unbind之后,該exchange 自動被刪除
- Internal:是否是內(nèi)部專用exchange,是的話,就意味著我們不能往該exchange里面發(fā)消息
- Arguments:參數(shù),是AMQP協(xié)議留給AMQP實現(xiàn)做擴展使用的
我們先新建一個名稱為hello-exchange,類型為direct的交換器,結(jié)果如下。
等會用于跟隊列關(guān)聯(lián)!
4.2、隊列管理
點擊進入 Queues 菜單,最下面也有一個Add a new queue標簽。
點擊標簽,即可進入添加隊列操作界面!
- Name:隊列名稱
- Durability:是否持久化,Durable:持久化,Transient:不持久化
- Auto delete:是否自動刪除,是的話,當隊列內(nèi)容為空時,會自動刪除隊列
- Arguments:參數(shù),是AMQP協(xié)議留給AMQP實現(xiàn)做擴展使用的
同樣的,新建一個名稱為hello-mq的消息隊列,結(jié)果如下。
隊列新建好了之后,繼續(xù)來建立綁定關(guān)系!
4.3、綁定管理
建立綁定關(guān)系,既可以從隊列進入也可以從交換器進入。
如果是從交換器進入,那么被關(guān)聯(lián)的對象就是隊列。
如果是從隊列進入,那么被關(guān)聯(lián)的對象就是交換器。
我們選擇從隊列入手,被綁定的交換器是hello-exchange,因為類型是direct,所以還需要填寫routing key。
建立完成之后,在交換器那邊也可以看到對應(yīng)的綁定關(guān)系。
4.4、發(fā)送消息
最后,我們從交換器入手,選擇對應(yīng)的交換器,點擊Publish message標簽,填寫對應(yīng)的路由鍵 key,發(fā)送一下數(shù)據(jù),查看數(shù)據(jù)是否發(fā)送到對應(yīng)的隊列中。
然后點擊進入 Queues 菜單,查詢消息隊列基本情況。
然后選擇hello-mq消息隊列,點擊Get messages標簽,獲取隊列中的消息。
結(jié)果如下,可以很清晰的看到,消息寫入到隊列!
五、JAVA客戶端使用
RabbitMQ 支持多種語言訪問,本次介紹 RabbitMQ Java Client 的一些簡單的api使用,如聲明 Exchange、Queue,發(fā)送消息,消費消息,一些高級 api 會在后面的文章中詳細的說明。
5.1、引入 rabbitMQ 依賴包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
5.2、連接服務(wù)器
使用給定的參數(shù)(host name,端口等等)連接AMQP的服務(wù)器。
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
也可以使用通過 URI 方式進行連接。
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
Connection(連接)接口可以被用作創(chuàng)建一個channel(管道),利用 channel(管道)可以進行發(fā)送和接收消息,在后面我們會頻繁使用到它。
Channel channel = conn.createChannel();
注意,管道使用之后,需要進行關(guān)閉。
channel.close();
conn.close();
5.3、創(chuàng)建交換器
不僅可以通過 web頁面進行創(chuàng)建交換器,還可以通過代碼進行聲明(創(chuàng)建的意思)交換器。
//創(chuàng)建exchange,類型是direct類型
channel.exchangeDeclare("ex-hello","direct");
//第三個參數(shù)表示是否持久化,同步操作,有返回值
AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("ex-hello","direct",true);
System.out.println(ok);
//創(chuàng)建帶屬性的交換器
Map<String,Object> argument = new HashMap<>();
argument.put("alternate-exchange","log");
channel.exchangeDeclare("ex-hello","direct",true,false,argument);
//異步創(chuàng)建exchange,沒有返回值
channel.exchangeDeclareNoWait("ex-hello","direct",true,false,false,argument);
///判斷exchange是否存在,存在的返回ok,不存在的exchange則報錯
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("ex-hello");
System.out.println(declareOk);
//刪除exchange(可重復(fù)執(zhí)行),刪除一個不存在的也不會報錯
channel.exchangeDelete("ex-hello");
創(chuàng)建交換器參數(shù)解讀:
- 第一個參數(shù):表示交換器名稱
- 第二個參數(shù):表示交換器類型
- 第三個參數(shù):表示是否持久化,為true表示會將隊列持久化存儲到硬盤
- 第四個參數(shù):表示是否自動刪除,當最后一個綁定(隊列或者exchange)被unbind之后,該exchange 自動被刪除
- 第五個參數(shù):表示設(shè)置參數(shù),參數(shù)類型為Map<String, Object>
5.4、創(chuàng)建隊列
同樣的,也可以通過代碼進行聲明隊列。
//同步創(chuàng)建隊列
channel.queueDeclare(queueName, true, false, false, null);
//異步創(chuàng)建隊列沒有返回值
channel.queueDeclareNoWait(queueName,true,false,false,null);
//判斷queue是否存在,不存在會拋出異常
channel.exchangeDeclarePassive(queueName);
//刪除隊列
channel.queueDelete(queueName);
創(chuàng)建隊列參數(shù)解讀:
- 第一個參數(shù):表示隊列名稱
- 第二個參數(shù):表示是否持久化,為true表示會將隊列持久化存儲到硬盤
- 第三個參數(shù):表示是否排它性,為true表示只對首次聲明它的連接可見,會在其連接斷開的時候自動刪除
- 第四個參數(shù):表示是否自動刪除,為true表示有過消費者并且所有消費者都解除訂閱了,自動刪除隊列
- 第五個參數(shù):表示設(shè)置參數(shù),參數(shù)類型為Map<String, Object>
5.5、創(chuàng)建綁定
當交換器和隊列都創(chuàng)建成功之后,就可以建立綁定關(guān)系。
//交換器和隊列進行綁定(可重復(fù)執(zhí)行,不會重復(fù)創(chuàng)建)
channel.queueBind(queueName, exchangeName, routingKey);
//異步進行綁定,最后一個參數(shù)表示可以帶自定義參數(shù)
channel.queueBindNoWait(queueName,exchangeName,routingKey,null);
//exchange和queue進行解綁(可重復(fù)執(zhí)行)
channel.queueUnbind(queueName, exchangeName, routingKey);
//exchange與exchange進行綁定(可重復(fù)執(zhí)行,不會重復(fù)創(chuàng)建)
//第一個參數(shù)表示目標交換器
//第二個參數(shù)表示原地址交換器
//第三個參數(shù)表綁定路由key
channel.exchangeBind(destination,source,routingKey);
//exchange和exchange進行解綁(可重復(fù)執(zhí)行)
channel.exchangeUnbind(destination,source,routingKey);
綁定關(guān)系參數(shù)解讀:
- queueName:隊列名稱,取自創(chuàng)建的隊列名稱
- exchangeName:交換器,取自創(chuàng)建的交換器名稱
- routingKey:路由鍵key,自定義
5.6、發(fā)送消息
發(fā)送消息到交換器就會使用我們上文所提到的channel管道。
//發(fā)送的消息內(nèi)容
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
也可以在發(fā)送消息前設(shè)定一些消息屬性。
//自己構(gòu)建BasicProperties的對象
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("zhangsan")
.build()),
messageBodyBytes);
發(fā)送指定頭信息的消息。
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("userName", '"zhangsan');
headers.put("userCode", "123");
//發(fā)送消息到交換器
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
發(fā)送一個有過期時間的消息,單位:ms。
//設(shè)置消息過期時間,單位ms
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("6000")
.build()),
messageBodyBytes);
更多用法,可以參見官方 API
5.7、接受消息
從消息隊列中接受消息也會使用我們上文所提到的channel管道。
//監(jiān)聽隊列中的消息
channel.basicConsume(queueName,true,new SimpleConsumer(channel));
監(jiān)聽隊列消息參數(shù)解讀:
- 第一個參數(shù):表示需要監(jiān)聽的隊列名稱
- 第二個參數(shù):表示是否自動確認,如果配置false表示手動確認消息是否收到
- 第三個參數(shù):表示消息處理類
具體的消息處理類需要繼承DefaultConsumer,并重寫handleDelivery方法,代碼如下:
public class SimpleConsumer extends DefaultConsumer{
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//接受從隊列中發(fā)送的消息
System.out.println(consumerTag);
System.out.println("-----收到消息了---------------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內(nèi)容為:"+new String(body));
}
}
如果是手工確認消息,需要在handleDelivery方法中進行相關(guān)的確認,代碼如下:
//手動確認
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
5.8、完整demo
5.8.1、發(fā)送消息
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
//連接RabbitMQ服務(wù)器
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("197.168.24.206");
factory.setPort(5672);
//創(chuàng)建一個連接
Connection conn = factory.newConnection();
//獲得信道
Channel channel = conn.createChannel();
//聲明交換器
channel.exchangeDeclare("ex-hello","direct");
//發(fā)送的消息內(nèi)容
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish("ex-hello", "route-hello", null, messageBodyBytes);
//關(guān)閉通道
channel.close();
conn.close();
}
}
5.8.2、接受消息
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//連接RabbitMQ服務(wù)器
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("197.168.24.206");
factory.setPort(5672);
//創(chuàng)建一個連接
Connection conn = factory.newConnection();
//獲得信道
Channel channel = conn.createChannel();
//聲明隊列
channel.queueDeclare("queue-hello", true, false, false, null);
//聲明綁定
channel.queueBind("queue-hello", "ex-hello", "route-hello");
//監(jiān)聽隊列中的消息
channel.basicConsume("queue-hello",true,new SimpleConsumer(channel));
TimeUnit.SECONDS.sleep(10);
channel.close();
conn.close();
}
}
消息處理類SimpleConsumer
public class SimpleConsumer extends DefaultConsumer {
public SimpleConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//接受從隊列中發(fā)送的消息
System.out.println(consumerTag);
System.out.println("-----收到消息了---------------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內(nèi)容為:"+new String(body));
}
}
消息發(fā)送成功之后,啟動消費者,輸出結(jié)果如下:
六、總結(jié)
整篇文章主要介紹了 RabbitMQ 內(nèi)部結(jié)構(gòu)、安裝步驟、使用教程,以及 java 客戶端使用等內(nèi)容,內(nèi)容比較長,限于筆者的才疏學淺,對本文內(nèi)容可能還有理解不到位的地方,如有闡述不合理之處還望留言一起探討。






