同步通訊
發送方發出數據后,等接收方發回響應以后才發下一個數據包的通訊方式
同步調用的時效性強,可以立即獲取結果
同步調用的問題
我們以前在使用Feign或OpenFeign時,就是使用的同步調用
- 代碼耦合度高:每次加入新的需求,都要修改原來的代碼
- 性能低:調用者需要等待服務提供者響應,如果調用鏈過長則響應時間等于每次調用的時間之和。
- 資源浪費:調用鏈中的每個服務在等待響應過程中,不能釋放請求占用的資源,高并發場景下會極度浪費系統資源
- 級聯失敗:如果服務提供者出現問題,所有調用方都會跟著出問題,如同多米諾骨牌一樣,迅速導致整個微服務群故障
異步通訊
發送方發出數據后,不等接收方發回響應,接著發送下個數據包的通訊方式
其常見的實線就是事件驅動模式
可以實現服務解耦的問題,性能得到提升,吞吐量提高,服務沒有強依賴性,不必擔心級聯失敗問題,實現服務削峰
異步通信的問題
- 依賴于Broker的可靠性、安全性、吞吐能力
- 架構復雜了,業務沒有明顯的流程線,不好追蹤管理
什么是MQ
N (Message Quene):翻譯為j消息隊列,通過典型的生產者和消費者模型生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入輕松地實現系統間解耦。別名為消息中間件,通過利用高效可靠的消息傳遞機制進行平臺無關的數據交流,并基于數據通信來進行分布式系統的集成。
1.ActiveMQ
ActiveM 是A4pache出品,最流行的,能力強勁的開源消息總線。它是一個完全支持/8規范的的消息中間件。豐富的API,多種集群架構模式讓認kctiveMA在業界成為老牌的消息中間件,在中小型企業頗受歡迎!
2.Kafka
Kafka是LinkedIn開源的分布式發布-訂閱消息系統,目前歸屬于Apache頂級項目。Kafka主要特點是基于Pu11的模式來處理消息消費,追求高吞吐量,一開始的目的就是用于日志收集和傳輸。8.8版本開始支持復制,不支持事務,對消息的重復、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。
3.RocketMQ
RocketNQ是阿里開源的消息中間件,它是純JAVA開發,具有高吞吐量、高可用性、適合大規模分布式系統應用的特點。RocketNO思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用于交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。
4.RabbitMQ
RabbitNQ是使用Erlang語言開發的開源消息隊列系統,基于ANQP協議來實現。ANQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。ANQP協議更多用在企業系統內對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
Docker安裝RabbitMQ
docker pull rabbitmq
//啟動
docker run
-e RABBITMQ_DEFAULT_USER=syf20020816
-e RABBITMQ_DEFAULT_PASS=20020816
--name mq
--hostname mq1
-p 15672:15672
-p 5672:5672
-d
rabbitmq:3.10-management

訪問15672端口


RabbitMq架構

- channel:操作MQ的工具
- exchange:路由消息到隊列中
- queue:緩存消息
- virtual host:虛擬主機,是對queue、exchange等資源的邏輯分組
QuickStart
簡單項目中(點對點的簡單模型)

1.引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.15.0</version>
</dependency>
2.建立虛擬主機

添加完成后如圖:

3.建立生產者
/**
* 直連式連接
*/
public class Publisher {
public static void main(String[] args) throws IOException, TimeoutException {
//創建連接mq的連接工廠對象
final ConnectionFactory connectionFactory = new ConnectionFactory();
//設置主機
connectionFactory.setHost("192.168.112.101");
//設置端口號
connectionFactory.setPort(5672);
//設置連接虛擬主機
connectionFactory.setVirtualHost("/test");
//設置虛擬主機的用戶名密碼
connectionFactory.setUsername("ssf2asdas6");
connectionFactory.setPassword("204545454");
//獲取連接對象
final Connection connection = connectionFactory.newConnection();
//獲取連接中的通道
final Channel channel = connection.createChannel();
//通道綁定對應的消息隊列
//參數1:隊列名稱,不存在則創建
//參數2:定義隊列特性是否需要持久化,true:持久化,false:非持久化
//參數3:是否獨占隊列
//參數4:是否在消費完成后刪除隊列
//參數5:拓展附加參數
channel.queueDeclare("demo1",false,false,false,null);
//發布消息
//參數1:交換機名稱(exchange)
//參數2:隊列名稱
//參數3:傳遞消息額外設置
//參數4:消息的具體內容
channel.basicPublish("","demo1",null,"hello world".getBytes());
//關閉資源
channel.close();
connection.close();
}
}
你可以看到在隊列里就多了一條消息了

4.建立消費者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//創建鏈接工廠
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.112.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/test");
connectionFactory.setUsername("ssf2asdas6");
connectionFactory.setPassword("204545454");
//創建連接對象
final Connection connection = connectionFactory.newConnection();
//創建通道
final Channel channel = connection.createChannel();
channel.queueDeclare("demo1",false,false,false,null);
//消費消息
//參數1:消費隊列的名稱
//參數2:開啟消息的自動確認機制
//參數3:消費時的回調接口
channel.basicConsume("demo1",true, new DefaultConsumer(channel){
//最后一個參數:消息隊列中取出的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
//關閉資源,若不關閉則一直進行監聽
channel.close();
connection.close();
}
}

消費后就看到這里隊列里的消息就清零了

SpringAMQP
Spring AMQP是基于AMQP協議定義的一套API規范,提供了模板來發送和接收消息。包含兩部分,其中spring-amqp是基礎抽象,spring-rabbit是底層的默認實現。
官方網址
https://spring.io/projects/spring-amqp
QuickStart
1. 初始化一個簡單微服務項目

結構如下

2.編寫yaml配置
無論你的消息發布者還是消息消費者都需要使用以下yaml配置
spring:
rabbitmq:
host: 192.168.112.101
port: 5672
virtual-host: /test
username: sysdaa6
password: 20asdsa16
3.發送消息
這里注意的是,你的隊列一定要是存在的
@SpringBootTest
class PublisherApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("demo1","hello springAMQP!");
}
}
4.接收消息(消費)
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "demo1")
public void listenQueue(String msg){
System.out.println(msg);
}
}
5.啟動測試

WorkQueue模型
使用工作隊列模型
工作隊列,可以提高消息處理速度,避免隊列消息堆積

消息預取機制
由于消息預取機制的存在,消息會平均地分配給每一個消費者

修改消費者的yaml修改預取機制
spring:
rabbitmq:
host: 192.168.112.101
port: 5672
virtual-host: /test
username: your username
password: your password
listener:
simple:
prefetch: 1 #表示每次只能獲取一條消息,處理完才能獲取下一條
消息發送者
@SpringBootTest
class PublisherApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testWorkQueue() throws InterruptedException {
String queueName = "demo1";
String msg = "this is the msg:";
for (int i = 0; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName,msg+i);
Thread.sleep(50);
}
}
}
消息消費者
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "demo1")
public void listenWorkQueue(String msg) throws InterruptedException {
System.out.println("=====consumer 1:=====|"+ LocalDateTime.now());
System.out.println(msg);
Thread.sleep(50);
}
@RabbitListener(queues = "demo1")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("=====consumer 2:=====|"+ LocalDateTime.now());
System.err.println(msg);
Thread.sleep(500);
}
}

發布訂閱模式
發布訂閱模式與之前案例的區別就是允許將同一消息發送給多個消費者。實現方式是加入了exchange(交換機)
exchange負責消息路由,而不是存儲,路由失敗則消息丟失

常見exchange類型包括
- Fanout:廣播
- Direct:路由
- Topic:話題
交換機的作用
- 接收publisher發送的消息
- 將消息按照規則路由到與之綁定的隊列
- 不能緩存消息,路由失敗,消息丟失
- FanoutExchange的會將消息路由到每個綁定的隊列
消息消費者
定義一個 FanoutConfig 配置類進行交換機和隊列的綁定
@Configuration
public class FanoutConfig {
//交換機
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("test.fanout");
}
//隊列1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
//綁定隊列1到交換機
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//隊列2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
//綁定隊列2到交換機
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
消息發送者
@Test
void testSendFanoutExchange(){
//交換機名稱
String exchangeName = "test.fanout";
//消息
String msg = "test for send FanoutExchange";
//發送消息
rabbitTemplate.convertAndSend(exchangeName,"",msg);
}



路由模式
Direct Exchange 會將接收到的消息根據規則路由到指定的Queue,因此稱為路由模式(routes)。
- 每一個Queue都與Exchange設置一個BindingKey
- 發布者發送消息時,指定消息的RoutingKey
- Exchange將消息路由到BindingKey與消息RoutingKey一致的隊列

消息發送者
消息發送者中指明routerKey
@Test
void testSendDirectExchange(){
//交換機名稱
String exchangeName = "test.direct";
//消息
String msg = "test for router to black queue";
//發送消息
rabbitTemplate.convertAndSend(exchangeName,"black",msg);
}
@Test
void testSendDirectExchange2(){
//交換機名稱
String exchangeName = "test.direct";
//消息
String msg = "test for router to white queue";
//發送消息
rabbitTemplate.convertAndSend(exchangeName,"white",msg);
}
消息消費者
定義一個監聽組件使用注解形式指定隊列名稱,交換機名稱和類型(默認direct),以及路由通道
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "test.direct",type = ExchangeTypes.DIRECT),
key = {
"black","white"}
))
public void listenDirectQueue1(String msg){
System.out.println("direct queue1:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "test.direct",type = ExchangeTypes.DIRECT),
key = {
"black","green"}
))
public void listenDirectQueue2(String msg){
System.out.println("direct queue2:"+msg);
}
}



話題模式
TopicExchange與DirectExchange類似,區別在于routingKey必須是多個單詞的列表,并且以 . 分割。
如: person.zhangsan
Queue與Exchange指定BindingKey時可以使用通配符:
#
:代指0個或多個單詞*
:代指一個單詞

消息發送者
@Test
public void testTopicExchange(){
//交換機名稱
String exchangeName = "test.topic";
//消息
String msg = "test for topic in china shanghai";
//發送消息
rabbitTemplate.convertAndSend(exchangeName,"china.shanghai",msg);
}
消息消費者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name ="test.topic",type = ExchangeTypes.TOPIC),
key = "china.shanghai"
))
public void listenTopicQueue1(String msg){
System.out.println("topic:china.shanghai:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name ="test.topic",type = ExchangeTypes.TOPIC),
key = "american.newyork"
))
public void listenTopicQueue2(String msg){
System.out.println("topic:american.newyork:"+msg);
}
結果
