今天閑聊下消息中間件的一些關鍵特性,對于消息中間件基礎知識,包括各種開源消息中間件的比較選型文章,網上已經有很多,在這里就不再重復進行描述。
因此這篇文章僅僅選擇一些消息中間件的一些關鍵特性和實踐中常遇到的問題進行總結。
同時兼顧消息發布訂閱和隊列

點對點隊列模式消息模式

發布訂閱模型Topic消息模型
消息中間件有一個關鍵能力,即1對多的消息發布訂閱模式。因此對于消息中間件也常說兩個能力,一個是Queue消息隊列能力,一個是Topiic消息主題能力。對于消息Topic即消息主題模式,支持消息的發布訂閱。
對于Queue模式消息被消費掉即從隊列中清除,不管是哪個消息端消費掉。對于Topic模式,單個訂閱端獲取到消息,消息并不會清除,而是需要所有訂閱端全部消費掉消息。
那么我們來舉一個最簡單的場景,即ERP系統需要將消息分發給CRM和SRM兩個系統。因此啟用了消息發布模式,建立了一個消息主題,比如會計科目分發消息。同時SRM和CRM系統是消息的訂閱方,因此在訂閱端寫了相應的監聽程序來訂閱消息。如下:

但是CRM和SRM都是集群部署,每個應用都有三個集群節點,每個節點在啟動的時候都會對消息主題進行訂閱和監控。
那么當ERP分發會計科目的時候,如果常規模式就變成了該消息會被6個訂閱端全部獲取到。這個時候對于CRM系統中集群三個點,獲取到三次消息顯然是重復的。因此我們需要對于訂閱端,在一個集群分組內部應該只要有一個節點獲取到消息,消息就應該清除掉。但是在不同的集群分組間,仍然應該是Topic模式。
因此在消息中間件實現中,需要有一個針對集群節點的進一步分組能力,比如上面的CRM和SRM應該分為兩個組,也就是常說的ClientID。有了ClientID分組,就可以實現組間的Topic能力,而組內啟用Queue模式。
不同的消息中間件在這塊的實現思路基本一致,即都需要有一個ClientID分組的概念。
比如在ActiveMQ中實現了虛擬Topic的功能。使用起來非常簡單。對于消息發布者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。
對于消息接收端來說,是個隊列,不同應用里使用不同的前綴作為隊列的名稱,即可表明自己的身份即可實現消費端應用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱為B的客戶端。
可以在同一個應用里使用多個consumer消費此queue,則可以實現上面兩個功能。又因為不同應用使用的queue名稱不同(前綴不同),所以不同的應用中都可以接收到全部的消息。每個客戶端相當于一個持久訂閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。
同時兼顧消息發布訂閱和路由
進一步來談消息發布訂閱,還是基于前面的場景,ERP分發會計科目消息,當前有SRM和CRM兩個訂閱端。但是仍然會出現一些特殊的場景:
比如CRM接收到消息,但是自己處理出現問題導致消息丟失,需要讓ERP系統重發消息。其次就是某些會計科目可能只有SRM系統需要使用,需要定向只發送給SRM系統。那么在這種情況下一個Topic主題往往并不能支持該需求。當然對于不同的Route規則可以建立不同的Topic,但是如果路由規則復雜,這樣會建立出大量的Topic本身也不現實。

而對于這種常見,類似主流的基于JMS的消息中間件往往并沒有topic+路由規則的消息投遞能力,對于這種情況只能夠單獨新實現接口來進行處理。或者是消息仍然按傳統規則進行分發,對于訂閱端拿到消息如果確實自己不需要或已經存在,就自己丟棄掉。
而對于RabbitMQ,可以看到專門有一個Topic Exchange模式。Topic Exchange是根據routing key和Exchange的類型將message發送到一個或者多個Queue中,可以通過它來實現pub/sub模式,即發布訂閱。類似下圖:

可靠性和消息持久化
在整個技術架構中引入了消息中間件后,雖然達到了異步解耦的作用,但實質是增加了整個集成架構的復雜度,同時也影響了整體架構的可靠性。
對于消息中間件如何增加可靠性,一個方面是高可用集群架構,一個則是消息本身要支持持久化存儲。通過消息持久化存儲來確保消息不丟失。

對于消息持久化,一般來說分為文件和數據庫兩大類。對于文件本身又分為了本地磁盤文件和共享文件系統兩個方式。
本地磁盤方式可以看到,如果當前Broker節點宕機,雖然消息可以在本地暫存,但是在當前節點沒有手工恢復前,消費端無法及時獲取消息。對于這個問題的實際解決思路是啟用類似Master-Slave的部署架構方式,即消息會快速的實時同步到Slave節點。這樣即使Master節點無法訪問,訂閱端也可以快速的從Slave節點獲取消息。
對于共享文件系統或數據庫方式,實際消息只持久化到一個集中的地方,因此不存在消息多點復制的操作,某個節點宕機,共享存儲的消息也可以快速的恢復到其它冗余節點。
分布式事務
在前面談到過基于消息中間件來實現消息最終一致性,具體如下:
場景:在采購系統中擬制采購訂單,在提交單據申請的時候既需要將單據成功保存到本地,同時又需要啟動遠程流程平臺提供的流程啟動服務。在該場景中,第二個步驟屬于必須要最終完成的操作,同時業務上也允許最終一致(不要因為流程平臺本身問題導致單據提交不成功,啟流程失敗如何重試是系統內部的事情)
對于該場景,基于消息實現最終一致性邏輯如下:

可以看到,報賬單提交和流程啟動仍然需要控制到一個事務里面,這個本身也是一個分布式事務控制場景。特別是在消息中間件需要在獲取到消息進行持久化的時候。

也就是說從報賬單提交,到消息發送給消息中間件,兩個活動需要控制在一個事務里面。而對于消息中間件本身又有兩種做法。
一種是只要獲取到消息,消息還在內存里面就認為成功并返回給發送端,即異步刷盤。還有一種就是拿到消息后必須持久化寫入成功后才返回,即同步刷盤。
因此要確保無任何消息丟失,必須是同步刷盤模式,確保消息持久化成功才能夠返回。
持久化還有什么作用?
當訂閱端取消息的進度緩慢,或者說訂閱端遲遲不取走消息,那么消息就一直在消息中間件中堆積,消息中間件內存容量優先,必須將更多的歷史消息先進行持久化處理進行內存釋放。否則消息中間件本身也很難應對。
那么消息大量積壓如何辦?常用的方式就是設置消息的過期機制,比如設置為5天,那么超過5天訂閱端還沒有取走的消息就自動廢棄。但是這種方式本身也影響到消息發送的完整性。
當然對于擠壓消息也可以手工清理。
比如當前已經產生1億條消息,分別由ERP分發給SRM和CRM系統,但是CRM系統對這些消息沒有消費而處于積壓狀態。我們需要手工對CRM訂閱的這部分消息進行清理。這個時候可以看到和基于數據庫進行大表的Delete條件刪除差不多。
即性能極其緩慢。因為清理消息過程實際就是條件刪除過程,同時這些數據還可能存在于多個磁盤文件中,清除后還需要對磁盤文件進行重整。對于消息的持久化存儲前面談到,寫消息到磁盤文件性能極高,比數據庫快很多,但是一遇到這種條件刪除或消息清理場景,那么性能就會極差。
JMS還是AMQP
JMS(JAVA Messaging Service)是Java平臺上有關面向消息中間件(MOM)的技術規范,它便于消息系統中的Java應用程序進行消息交換,并且通過提供標準的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。
傳統企業內部使用的消息集成中間件,比如IBM MQ,Weblogic JMS, 開源的ActiveMQ等基本都基于JMS標準規范和協議。支持點對點隊列模式,也支持消息發布訂閱。
AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制。
比如RabbitMQ即是通過AMQP協議實現。
在這里并不能說基于AMQP實現的消息中間件性能就一定好于JMS,對于阿里基于JMS實現的RocketMQ網上能夠查到的性能測試數據實際是好于RabbitMQ的。
具體的測試數據可以參考網上的一個對比圖:

高可用性集群搭建
對于集群可以分為兩類,一類是應用中間件集群,一類是涉及到持久化中間件集群。
應用中間件集群
對于應用中間件集群,比如常見的weblogic,Tomcat應用中間件和Web容器。這類要實現高可用性本身相對容易,因為本身不涉及到數據的持久化存儲方面的問題。
應用集群高可用,唯一就是服務器節點之間的Session狀態信息同步,當前也有很多的方案,類似Session同步復制,采用數據庫,redis等共享庫來保持Sesion會話信息等。再簡單點,你完全可以在上層接入到負載均衡設備,通過負載均衡設備配置Session會話保持來保持會話信息。在這種情況下僅是需要容忍集群節點出現故障,當前在該節點的會話不可用。
其次就是集群節點的心跳檢測,一種是最簡單的方法就是心跳ping節點,如果出現異常則認為節點出現故障。而這種情況下無法判斷集群節點是否假死或Hang住了。因此也有進一步的做法,即采用OpenRestry來實現集群負載和動態的心跳監控。
持久化中間件集群

對于消息中間件,Redis緩存數據庫等都可以看到,典型特點就是涉及到數據持久化的問題。而這些中間件本身的高可用集群思路基本完全一樣。
集群節點配置問題
集群節點配置,可以看到有1主1從,多主,多(主+從)等多種模式來實現集群擴展。要看到主從的目的是實現Master宕機的時候消息還能夠從Slave上獲取,不影響消息實時性,Slave節點實際是實時在復制主節點數據,并不提供性能分擔。
而多主的目的則是真正分散消息并發泄到集群時候的性能問題。即使是一個最簡單的點對點消息模型,Producer也可以將消息分發到兩個Master節點進行性能分擔,以減輕單Master節點下的性能壓力。
所以如果僅僅是高可靠,Master+Slave基本就能夠滿足。但是要滿足高可靠+高性能,則必須搭建多個Master+多個Slave組成的集群。
管理或心跳監控節點
管理節點有很多方法,比如RocketMQ里面自己實現的Name Server集群,Redis集群實現的Sentinel哨兵集群,或者通用的Zookeeper集群來實現分布式協調等。
但是不論哪種方式都能看到管理節點往往是3臺服務器配置模式。
首先Admin節點采用單點肯定不行,即存在單點故障。
因此你至少需要配置兩臺Server來作為Admin管理節點,但是配置2臺服務器的時候又發現一個新問題,即如果2臺服務器檢測到的節點狀態不一致那么聽誰的?
正是這樣原因引入了第3臺服務器,即能夠超過半數的投票機制來實現文檔的集群管理節點。可以看到類似Kurbernetes集群同樣也是需要3臺服務器來實現多主的高可用配置。
持久化存儲
前面已經談到了持久化存儲。對于消息中間件在集群下,實際每個集群節點都可能接收到大量的消息,消息是存儲在本地磁盤還是共享文件存儲是需要考慮的問題。
如果是本地磁盤,那么一般需考慮Master-Slave模式,實現消息的實時同步復制,以確保在Master宕機的情況下不影響到消息的實時獲取。如果是共享存儲模式,實際可以看到不一定必須配置Slave節點,即某個Master宕機后,另外一個節點可以從共享存儲中重新裝載消息,雖然稍微有一定延遲,但是基本不會影響到消息的實時性。
類似Weblogic JMS集群基本采用這種方式,即共享文件進行持久化存儲,當節點出現故障的時候,請求信息會漂移到其它非故障節點完成高可用性。