先熟悉下面會用到的一些名詞~
- exchange: 交換機
- routingkey: 路由key
- queue: 隊列
exchange和queue是需要綁定在一起的,然后消息發送到exchange再由exchange通過routingkey發送到對應的隊列中。
exchange分四種
Default Exchange
這種是特殊的Direct Exchange,是rabbitmq內部默認的一個交換機。該交換機的name是空字符串,所有queue都默認binding 到該交換機上。所有binding到該交換機上的queue,routing-key都和queue的name一樣。
注意: 這就是為什么你直接創建一個queue也能正常的生產與消費,因為對應的exchange是RabbitMQ默認的,routingkey就是該隊列的名字
Topic Exchange
通配符交換機,exchange會把消息發送到一個或者多個滿足通配符規則的routing-key的queue。其中表號匹配一個word,#匹配多個word和路徑,路徑之間通過.隔開。如滿足a..c的routing-key有a.hello.c;滿足#.hello的routing-key有a.b.c.helo。
Fanout Exchange
扇形交換機,該交換機會把消息發送到所有binding到該交換機上的queue。這種是publisher/subcribe模式。用來做廣播最好。
所有該exchagne上指定的routing-key都會被ignore掉。
Header Exchange
設置header attribute參數類型的交換機。
簡單的了解之后,下面就是延遲隊列的實現方式
延遲隊列的實現
延遲分兩種
- 在msg上設置過期時間
- 在隊列上設置過期時間
一定要看懂這張圖!!!
如上圖創建三個exchange和三個隊列
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
@Bean
public DirectExchange processExchange() {
return new DirectExchange(PROCESS_EXCHANGE_NAME);
}
@Bean
public DirectExchange delayQueueExchange() {
return new DirectExchange(DELAY_QUEUE_EXCHANGE_NAME);
}
/**
* 存放延遲消息的隊列 最后將會轉發給exchange(實際消費隊列對應的)
* @return
*/
@Bean
Queue delayQueue4Msg(){
return QueueBuilder.durable(DELAY_QUEUE_MSG)
.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", ROUTING_KEY)
.build();
}
@Bean
public Queue processQueue() {
return QueueBuilder.durable(PROCESS_QUEUE)
.build();
}
/**
* 存放消息的延遲隊列 最后將會轉發給exchange(實際消費隊列對應的)
* @return
*/
@Bean
public Queue delayQueue4Queue() {
return QueueBuilder.durable(DELAY_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE_NAME) // DLX
.withArgument("x-dead-letter-routing-key", ROUTING_KEY)
.withArgument("x-message-ttl", 3000) // 設置隊列的過期時間 單位毫秒
.build();
}
接下來將每個exchange和對應的mq綁定
@Bean
Binding delayBinding() {
return BindingBuilder.bind(delayQueue4Msg())
.to(delayExchange())
.with(ROUTING_KEY);
}
@Bean
Binding queueBinding() {
return BindingBuilder.bind(processQueue())
.to(processExchange())
.with(ROUTING_KEY);
}
@Bean
Binding delayQueueBind() {
return BindingBuilder.bind(delayQueue4Queue())
.to(delayQueueExchange())
.with(ROUTING_KEY);
}
發送消息的方式
public void sendDelayMsg(Msg msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(msg.getId() + " 延遲消息發送時間:" + sdf.format(new Date()));
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, "delay", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(msg.getTtl() + "");
return message;
}
});
}
public void sendDelayQueue(Msg msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(msg.getId() + " 延遲隊列消息發送時間:" + sdf.format(new Date()));
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_QUEUE_EXCHANGE_NAME,"delay", msg);
}
驗證結果
為每個消息設置過期時間
為隊列設置過期時間
如果你把設置了過期時間的消息發送到設置了過期時間的隊里中的時候,以最短的時間為準。
最后
其實我在實現的過程中也花了很長的時間,主要就是被exchange和queue搞亂掉了,最后索性自己畫了個圖,按照圖來一個一個創建與綁定。之后就很清晰很容易的實現了。
強調!!! 如果在開發的過程中發現exchange和queue綁定錯誤了,建議從管理界面將queue和exchange unbind或者刪除重新創建!






