亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

先熟悉下面會用到的一些名詞~

  • exchange: 交換機
  • routingkey: 路由key
  • queue: 隊列

exchange和queue是需要綁定在一起的,然后消息發送到exchange再由exchange通過routingkey發送到對應的隊列中。

一張圖帶你理解和實現RabbitMQ的延遲隊列功能

exchange分四種

Default Exchange

這種是特殊的Direct Exchange,是rabbitmq內部默認的一個交換機。該交換機的name是空字符串,所有queue都默認binding 到該交換機上。所有binding到該交換機上的queue,routing-key都和queue的name一樣。

注意: 這就是為什么你直接創建一個queue也能正常的生產與消費,因為對應的exchange是RabbitMQ默認的,routingkey就是該隊列的名字

Topic Exchange

通配符交換機,exchange會把消息發送到一個或者多個滿足通配符規則的routing-key的queue。其中表號匹配一個word,#匹配多個word和路徑,路徑之間通過.隔開。如滿足a..c的routing-key有a.hello.c;滿足#.hello的routing-key有a.b.c.helo。

Fanout Exchange

扇形交換機,該交換機會把消息發送到所有binding到該交換機上的queue。這種是publisher/subcribe模式。用來做廣播最好。

所有該exchagne上指定的routing-key都會被ignore掉。

Header Exchange

設置header attribute參數類型的交換機。

簡單的了解之后,下面就是延遲隊列的實現方式

延遲隊列的實現

延遲分兩種

  • 在msg上設置過期時間
  • 在隊列上設置過期時間
一張圖帶你理解和實現RabbitMQ的延遲隊列功能

一定要看懂這張圖!!!

如上圖創建三個exchange和三個隊列

@Bean

public DirectExchange delayExchange() {

return new DirectExchange(DELAY_EXCHANGE_NAME);

}

@Bean

public DirectExchange processExchange() {

return new DirectExchange(PROCESS_EXCHANGE_NAME);

}

@Bean

public DirectExchange delayQueueExchange() {

return new DirectExchange(DELAY_QUEUE_EXCHANGE_NAME);

}

/**

* 存放延遲消息的隊列 最后將會轉發給exchange(實際消費隊列對應的)

* @return

*/

@Bean

Queue delayQueue4Msg(){

return QueueBuilder.durable(DELAY_QUEUE_MSG)

.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE_NAME)

.withArgument("x-dead-letter-routing-key", ROUTING_KEY)

.build();

}

@Bean

public Queue processQueue() {

return QueueBuilder.durable(PROCESS_QUEUE)

.build();

}

/**

* 存放消息的延遲隊列 最后將會轉發給exchange(實際消費隊列對應的)

* @return

*/

@Bean

public Queue delayQueue4Queue() {

return QueueBuilder.durable(DELAY_QUEUE_NAME)

.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE_NAME) // DLX

.withArgument("x-dead-letter-routing-key", ROUTING_KEY)

.withArgument("x-message-ttl", 3000) // 設置隊列的過期時間 單位毫秒

.build();

}

接下來將每個exchange和對應的mq綁定

@Bean

Binding delayBinding() {

return BindingBuilder.bind(delayQueue4Msg())

.to(delayExchange())

.with(ROUTING_KEY);

}

@Bean

Binding queueBinding() {

return BindingBuilder.bind(processQueue())

.to(processExchange())

.with(ROUTING_KEY);

}

@Bean

Binding delayQueueBind() {

return BindingBuilder.bind(delayQueue4Queue())

.to(delayQueueExchange())

.with(ROUTING_KEY);

}

發送消息的方式

public void sendDelayMsg(Msg msg) {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println(msg.getId() + " 延遲消息發送時間:" + sdf.format(new Date()));

rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, "delay", msg, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setExpiration(msg.getTtl() + "");

return message;

}

});

}

public void sendDelayQueue(Msg msg) {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println(msg.getId() + " 延遲隊列消息發送時間:" + sdf.format(new Date()));

rabbitTemplate.convertAndSend(RabbitConfig.DELAY_QUEUE_EXCHANGE_NAME,"delay", msg);

}

驗證結果

為每個消息設置過期時間

一張圖帶你理解和實現RabbitMQ的延遲隊列功能

為隊列設置過期時間

一張圖帶你理解和實現RabbitMQ的延遲隊列功能

如果你把設置了過期時間的消息發送到設置了過期時間的隊里中的時候,以最短的時間為準。

最后

其實我在實現的過程中也花了很長的時間,主要就是被exchange和queue搞亂掉了,最后索性自己畫了個圖,按照圖來一個一個創建與綁定。之后就很清晰很容易的實現了。

強調!!! 如果在開發的過程中發現exchange和queue綁定錯誤了,建議從管理界面將queue和exchange unbind或者刪除重新創建!

分享到:
標簽:RabbitMQ
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定