環境搭建
// maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
//Application
spring.application.name=rabbitmq-demo
server.port= 9099
spring.rabbitmq.host=主機
spring.rabbitmq.port=5672
spring.rabbitmq.username=test-user
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/test-v //虛擬主機
代碼操作
直連工作模式
// 生產者
@RestController
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("createMQ")
public void createMQ(){
// 隊列名稱 沒有則創建 發送消息信息
rabbitTemplate.convertAndSend("test-q","hello");
}
}
//消費者
@Component
// 參數 隊列名稱 是否持久化 是否自動刪除
@RabbitListener(queuesToDeclare = @Queue(value = "test-q",durable = "true",autoDelete = "true"))
// 此處有默認值 可以只寫隊列名稱,其它參數默認值為持久化 非獨占 非自動刪除
public class CustomerMQ {
@RabbitHandler
public void receive(String msg){
System.out.println("msg is "+msg);
}
}
work模型
默認輪詢
該模型中消息隊列只負責將消息按輪詢發送至所有監聽中的消費者, 與消費者的內部邏輯毫無關系
@GetMapping("workMQ")
public void workMQ(){
// 循環10
for (int i = 0; i < 10; i++) {
// 隊列名稱 沒有則創建(隨便起) 發送消息內容
rabbitTemplate.convertAndSend("work","hello");
}
}
//work模型消費者監聽類
@Component
public class WorkCustomer {
// 注意 此處 rabbitListener是作用在方法上了 這樣編程更加容易
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive(String msg) throws InterruptedException {
//此處模擬消費者1執行的慢 消費者2 執行的快
//很顯然執行結果與消費者執行快慢沒有聯系 是每消費者執行5條
Thread.sleep(2000);
System.out.println("receive1 msg is "+msg);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String msg){
System.out.println("receive2 msg is "+msg);
}
}
按勞分配
顧名思義,此策略下的消息隊列分配消息是與 消費者的邏輯有直接聯系的, 消費者需要消費完該消息并 反饋消息隊列 ,消息隊列才會繼續發送消息至該消費者。
// 代碼與上面一致
application.properties 添加一行 限制接收消息
spring.rabbitmq.listener.simple.prefetch=1
//即可
廣播模型(fanout)
該場景是經典的廣播模式,消費者的消息將會發送至所有 監聽中的消費者
//生產者 接口
@GetMapping("fanoutMQ")
public void fanoutMQ(){
for (int i = 0; i < 10; i++) {
// 第一個參數交換機名稱 與消費者對應即可
//第二個參數 隊列名稱 此處使用臨時隊列 不需要值 第三參數 發送消息內容
rabbitTemplate.convertAndSend("test-fanout","","hello");
}
}
//消費者類
@Component
public class FanoutCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
// value 交換機名稱 與生產者對應 type 交換機類型 fanout
exchange = @Exchange(value= "test-fanout",type = "fanout")
)
})
public void receive(String msg) throws InterruptedException {
Thread.sleep(2000);
System.out.println("receive1 msg is "+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
// value 交換機名稱 與生產者對應 type 交換機類型 fanout
exchange = @Exchange(value= "test-fanout",type = "fanout")
)
})
public void receive2(String msg){
System.out.println("receive2 msg is "+msg);
}
}
路由模型
該模型使用固定的路由key 來選擇發送消息至所有監聽的消費者
// 生產者
@GetMapping("directMQ")
public void directMQ(){
// 第一個參數交換機名稱 與消費者對應即可 第二個參數 路由key 該參數決定發送至哪個消費者
// 第三參數 發送消息內容
rabbitTemplate.convertAndSend("test-direct","key1","hello");
}
//消費者
// 說明: 此處定義四個監聽消費者 一監聽的key:key1 key2 key 3 二: key1 三: key2
//四 :key3
/**
此處消費者發送是的key1 很明顯消息只可能被 消費者一和消費者二接收到
*/
@Component
public class DirectCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
// value 交換機名稱 與生產者對應 type 交換機類型 fanout
exchange = @Exchange(value= "test-direct",type = "direct") ,
// 該參數默認類型就是direct
key = {
"key1","key2","key3"}
)
})
public void receive(String msg) throws InterruptedException {
System.out.println("receive1 msg is "+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
// value 交換機名稱 與生產者對應 type 交換機類型 fanout
exchange = @Exchange(value= "test-direct",type = "direct") ,
// 該參數默認類型就是direct
key = {
"key1"}
)
})
public void receive2(String msg){
System.out.println("receive2 msg is "+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
// value 交換機名稱 與生產者對應 type 交換機類型 fanout
exchange = @Exchange(value= "test-direct",type = "direct") ,
// 該參數默認類型就是direct
key = {
"key2"} )
})
public void receive3(String msg) throws InterruptedException {
Thread.sleep(2000);
System.out.println("receive1 msg is "+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
// value 交換機名稱 與生產者對應 type 交換機類型 fanout
exchange = @Exchange(value= "test-direct",type = "direct") ,
// 該參數默認類型就是direct
key = {
"key3"}
)
})
public void receive4(String msg){
System.out.println("receive2 msg is "+msg);
}
}
topic 模型
topic模型與路由一致,就是將固定的路由key改成路由通配符,其實原理都一致,mq提供的路由通配符 * 和 #
*:匹配一個任意的單詞
#: 匹配多個任意的單詞
舉例: 生產者發送的路由key : “test.topic”
消費者1設置的路由key :"test.* " , 消費者2設置的路由key: "test.# "
那么兩個消費者都可以接收到該消息 ,但是當生產者將路由key改為 test.topic.one
這時只有消費者2可以接收到消息 因為#通配符可以接收任意多個單詞
//生產者
@GetMapping("topicMQ")
public void topicMQ(){
rabbitTemplate.convertAndSend("test-topic","key.topic","hello");
}
//消費者
@Component
public class TopicCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
exchange = @Exchange(value= "test-topic",type = "topic") ,
// 該參數默認類型就是direct
key = {
"key.#","key.*"}
)
})
public void receive(String msg) throws InterruptedException {
System.out.println("receive msg is "+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
exchange = @Exchange(value= "test-topic",type = "topic") ,
// 該參數默認類型就是direct
key = {
"key.#"}
)
})
public void receive1(String msg) throws InterruptedException {
System.out.println("receive1 msg is "+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 代表使用的臨時隊列
exchange = @Exchange(value= "test-topic",type = "topic") ,
// 該參數默認類型就是direct
key = {
"key.*"}
)
})
public void receive2(String msg) throws InterruptedException {
System.out.println("receive2 msg is "+msg);
}
來源:
https://blog.csdn.NET/pgcdnameming/article/details/126666982






