亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

同步通訊

發送方發出數據后,等接收方發回響應以后才發下一個數據包的通訊方式

同步調用的時效性強,可以立即獲取結果

同步調用的問題

我們以前在使用Feign或OpenFeign時,就是使用的同步調用

  1. 代碼耦合度高:每次加入新的需求,都要修改原來的代碼
  2. 性能低:調用者需要等待服務提供者響應,如果調用鏈過長則響應時間等于每次調用的時間之和。
  3. 資源浪費:調用鏈中的每個服務在等待響應過程中,不能釋放請求占用的資源,高并發場景下會極度浪費系統資源
  4. 級聯失敗:如果服務提供者出現問題,所有調用方都會跟著出問題,如同多米諾骨牌一樣,迅速導致整個微服務群故障

異步通訊

發送方發出數據后,不等接收方發回響應,接著發送下個數據包的通訊方式

其常見的實線就是事件驅動模式

可以實現服務解耦的問題,性能得到提升,吞吐量提高,服務沒有強依賴性,不必擔心級聯失敗問題,實現服務削峰

異步通信的問題

  1. 依賴于Broker的可靠性、安全性、吞吐能力
  2. 架構復雜了,業務沒有明顯的流程線,不好追蹤管理

什么是MQ

N (Message Quene):翻譯為j消息隊列,通過典型的生產者和消費者模型生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入輕松地實現系統間解耦。別名為消息中間件,通過利用高效可靠的消息傳遞機制進行平臺無關的數據交流,并基于數據通信來進行分布式系統的集成。

1.ActiveMQ

ActiveM 是A4pache出品,最流行的,能力強勁的開源消息總線。它是一個完全支持/8規范的的消息中間件。豐富的API,多種集群架構模式讓認kctiveMA在業界成為老牌的消息中間件,在中小型企業頗受歡迎!

2.Kafka

Kafka是LinkedIn開源的分布式發布-訂閱消息系統,目前歸屬于Apache頂級項目。Kafka主要特點是基于Pu11的模式來處理消息消費,追求高吞吐量,一開始的目的就是用于日志收集和傳輸。8.8版本開始支持復制,不支持事務,對消息的重復、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。

3.RocketMQ

RocketNQ是阿里開源的消息中間件,它是純JAVA開發,具有高吞吐量、高可用性、適合大規模分布式系統應用的特點。RocketNO思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用于交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。

4.RabbitMQ

RabbitNQ是使用Erlang語言開發的開源消息隊列系統,基于ANQP協議來實現。ANQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。ANQP協議更多用在企業系統內對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。

Docker安裝RabbitMQ

docker pull rabbitmq
//啟動
docker run 
-e RABBITMQ_DEFAULT_USER=syf20020816 
-e RABBITMQ_DEFAULT_PASS=20020816 
--name mq 
--hostname mq1 
-p 15672:15672 
-p 5672:5672 
-d 
rabbitmq:3.10-management

 

訪問15672端口

 

 

RabbitMq架構

 

  • channel:操作MQ的工具
  • exchange:路由消息到隊列中
  • queue:緩存消息
  • virtual host:虛擬主機,是對queue、exchange等資源的邏輯分組

QuickStart

簡單項目中(點對點的簡單模型)

 

1.引入依賴

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.15.0</version>
</dependency>

2.建立虛擬主機

 

添加完成后如圖:

 

3.建立生產者

/**
 * 直連式連接
 */
public class Publisher {
 
    public static void main(String[] args) throws IOException, TimeoutException {
 
        //創建連接mq的連接工廠對象
        final ConnectionFactory connectionFactory = new ConnectionFactory();
        //設置主機
        connectionFactory.setHost("192.168.112.101");
        //設置端口號
        connectionFactory.setPort(5672);
        //設置連接虛擬主機
        connectionFactory.setVirtualHost("/test");
        //設置虛擬主機的用戶名密碼
        connectionFactory.setUsername("ssf2asdas6");
        connectionFactory.setPassword("204545454");
        //獲取連接對象
        final Connection connection = connectionFactory.newConnection();
        //獲取連接中的通道
        final Channel channel = connection.createChannel();
        //通道綁定對應的消息隊列
        //參數1:隊列名稱,不存在則創建
        //參數2:定義隊列特性是否需要持久化,true:持久化,false:非持久化
        //參數3:是否獨占隊列
        //參數4:是否在消費完成后刪除隊列
        //參數5:拓展附加參數
        channel.queueDeclare("demo1",false,false,false,null);
        //發布消息
        //參數1:交換機名稱(exchange)
        //參數2:隊列名稱
        //參數3:傳遞消息額外設置
        //參數4:消息的具體內容
        channel.basicPublish("","demo1",null,"hello world".getBytes());
        //關閉資源
        channel.close();
        connection.close();

    }
}

你可以看到在隊列里就多了一條消息了

 

4.建立消費者

public class Consumer {
 
    public static void main(String[] args) throws IOException, TimeoutException {
 
        //創建鏈接工廠
        final ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.112.101");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("ssf2asdas6");
        connectionFactory.setPassword("204545454");
        //創建連接對象
        final Connection connection = connectionFactory.newConnection();
        //創建通道
        final Channel channel = connection.createChannel();
        channel.queueDeclare("demo1",false,false,false,null);
        //消費消息
        //參數1:消費隊列的名稱
        //參數2:開啟消息的自動確認機制
        //參數3:消費時的回調接口
        channel.basicConsume("demo1",true, new DefaultConsumer(channel){
 
            //最后一個參數:消息隊列中取出的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                System.out.println(new String(body));
            }
        });
        //關閉資源,若不關閉則一直進行監聽
        channel.close();
        connection.close();
    }
}

 

消費后就看到這里隊列里的消息就清零了

 

SpringAMQP

Spring AMQP是基于AMQP協議定義的一套API規范,提供了模板來發送和接收消息。包含兩部分,其中spring-amqp是基礎抽象,spring-rabbit是底層的默認實現。

官方網址

https://spring.io/projects/spring-amqp

QuickStart

1. 初始化一個簡單微服務項目

 

結構如下

 

2.編寫yaml配置

無論你的消息發布者還是消息消費者都需要使用以下yaml配置

spring:
  rabbitmq:
    host: 192.168.112.101
    port: 5672
    virtual-host: /test
    username: sysdaa6
    password: 20asdsa16

3.發送消息

這里注意的是,你的隊列一定要是存在的

@SpringBootTest
class PublisherApplicationTests {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
 
        rabbitTemplate.convertAndSend("demo1","hello springAMQP!");

    }
}

4.接收消息(消費)

@Component
public class SpringRabbitListener {
 
    @RabbitListener(queues = "demo1")
    public void listenQueue(String msg){
 
        System.out.println(msg);
    }
}

5.啟動測試

 

WorkQueue模型

使用工作隊列模型

工作隊列,可以提高消息處理速度,避免隊列消息堆積

 

消息預取機制

由于消息預取機制的存在,消息會平均地分配給每一個消費者

 

修改消費者的yaml修改預取機制

spring:
  rabbitmq:
    host: 192.168.112.101
    port: 5672
    virtual-host: /test
    username: your username
    password: your password
    listener:
      simple:
        prefetch: 1 #表示每次只能獲取一條消息,處理完才能獲取下一條

消息發送者

@SpringBootTest
class PublisherApplicationTests {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testWorkQueue() throws InterruptedException {
 
        String queueName = "demo1";
        String msg = "this is the msg:";
        for (int i = 0; i <= 50; i++) {
 
            rabbitTemplate.convertAndSend(queueName,msg+i);
            Thread.sleep(50);
        }
    }
}

消息消費者

@Component
public class SpringRabbitListener {
 

    @RabbitListener(queues = "demo1")
    public void listenWorkQueue(String msg) throws InterruptedException {
 
        System.out.println("=====consumer 1:=====|"+ LocalDateTime.now());
        System.out.println(msg);
        Thread.sleep(50);
    }

    @RabbitListener(queues = "demo1")
    public void listenWorkQueue2(String msg) throws InterruptedException {
 
        System.err.println("=====consumer 2:=====|"+ LocalDateTime.now());
        System.err.println(msg);
        Thread.sleep(500);
    }
}

 

發布訂閱模式

發布訂閱模式與之前案例的區別就是允許將同一消息發送給多個消費者。實現方式是加入了exchange(交換機)

exchange負責消息路由,而不是存儲,路由失敗則消息丟失

 

常見exchange類型包括

  1. Fanout:廣播
  2. Direct:路由
  3. Topic:話題

交換機的作用

  1. 接收publisher發送的消息
  2. 將消息按照規則路由到與之綁定的隊列
  3. 不能緩存消息,路由失敗,消息丟失
  4. FanoutExchange的會將消息路由到每個綁定的隊列

消息消費者

定義一個 FanoutConfig 配置類進行交換機和隊列的綁定

@Configuration
public class FanoutConfig {
 
    //交換機
    @Bean
    public FanoutExchange fanoutExchange() {
 
        return new FanoutExchange("test.fanout");
    }

    //隊列1
    @Bean
    public Queue fanoutQueue1() {
 
        return new Queue("fanout.queue1");
    }

    //綁定隊列1到交換機
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
 
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    //隊列2
    @Bean
    public Queue fanoutQueue2() {
 
        return new Queue("fanout.queue2");
    }

    //綁定隊列2到交換機
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
 
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

消息發送者

@Test
    void testSendFanoutExchange(){
 
        //交換機名稱
        String exchangeName = "test.fanout";
        //消息
        String msg = "test for send FanoutExchange";
        //發送消息
        rabbitTemplate.convertAndSend(exchangeName,"",msg);
    }

 


 

 

路由模式

Direct Exchange 會將接收到的消息根據規則路由到指定的Queue,因此稱為路由模式(routes)。

  • 每一個Queue都與Exchange設置一個BindingKey
  • 發布者發送消息時,指定消息的RoutingKey
  • Exchange將消息路由到BindingKey與消息RoutingKey一致的隊列

 

消息發送者

消息發送者中指明routerKey

@Test
    void testSendDirectExchange(){
 
        //交換機名稱
        String exchangeName = "test.direct";
        //消息
        String msg = "test for router to black queue";
        //發送消息
        rabbitTemplate.convertAndSend(exchangeName,"black",msg);
    }
    @Test
    void testSendDirectExchange2(){
 
        //交換機名稱
        String exchangeName = "test.direct";
        //消息
        String msg = "test for router to white queue";
        //發送消息
        rabbitTemplate.convertAndSend(exchangeName,"white",msg);
    }

消息消費者

定義一個監聽組件使用注解形式指定隊列名稱,交換機名稱和類型(默認direct),以及路由通道

@Component
public class SpringRabbitListener {
 

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "test.direct",type = ExchangeTypes.DIRECT),
            key = {
 "black","white"}
    ))
    public void listenDirectQueue1(String msg){
 
        System.out.println("direct queue1:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "test.direct",type = ExchangeTypes.DIRECT),
            key = {
 "black","green"}
    ))
    public void listenDirectQueue2(String msg){
 
        System.out.println("direct queue2:"+msg);
    }
}

 


 


 

話題模式

TopicExchange與DirectExchange類似,區別在于routingKey必須是多個單詞的列表,并且以 . 分割。

如: person.zhangsan

Queue與Exchange指定BindingKey時可以使用通配符:

  • #:代指0個或多個單詞
  • *:代指一個單詞

 

消息發送者

@Test
    public void  testTopicExchange(){
 
        //交換機名稱
        String exchangeName = "test.topic";
        //消息
        String msg = "test for topic in china shanghai";
        //發送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.shanghai",msg);
    }

消息消費者

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name ="test.topic",type = ExchangeTypes.TOPIC),
            key = "china.shanghai"
    ))
    public void listenTopicQueue1(String msg){
 
        System.out.println("topic:china.shanghai:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name ="test.topic",type = ExchangeTypes.TOPIC),
            key = "american.newyork"
    ))
    public void listenTopicQueue2(String msg){
 
        System.out.println("topic:american.newyork:"+msg);
    }
結果

 


 

分享到:
標簽:微服
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定