老师,为什么 user.* 和 user.# 均能收到 user.delete.abc 这条消息。源码来自于老师提供的案例。
来源:2-8 Rabbitmq核心API-Exchange之Topic
guaguaerhao
2021-08-22 11:25:56
问题描述:
为什么 user.* 和 user.# 均能收到 user.delete.abc 这条消息
操作:分别单独启动某一个消费者,再启动消息提供者
相关截图:
1.只启动 user.* 的消费者类,可以接收到 user.save、user.update、user.delete.abc 这些消息
2.只启动 user.# 的消费者类,可以接收到 user.save、user.update、user.delete.abc 这些消息
相关代码:(消费者2 的代码改了好几遍,就不代码缩减没了,不知道为啥)
// 消费者1,routingKey 为 user.* Receiver4TopicExchange1.java
public class Receiver4TopicExchange1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("121.4.70.250");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.*";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("routingKey: " +routingKey + ", consumer1 start.. ");
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
}
}
// 消费者2,routingKey 为 user.# Receiver4TopicExchange2.java
public class Receiver4TopicExchange2 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("121.4.70.250");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("routingKey: " +routingKey + ", consumer2 start.. ");
// 循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
}
}
// 消息提供者,发送了三条消息,routingKey 分别为
// user.update
// user.save
// user.delete.abc
public class Sender4TopicExchange {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("121.4.70.250");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 发送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
2回答
因为你一个topic绑定了两个规则,你可以在控制台删除user.# 再试试能不能消费user.xxx.xxx
就不行了哦
qq_范特西_8
2023-03-05
就是你同一个队列,绑定了两个规则,所以这两个消费者无论谁都可以随机消费,但如果两个规则对应两个不同的队列,两个消费者也消费各自的队列,这时候就按匹配规则来了,
另外,如果两个消费者监听同一个队列,那么他俩会均摊消息不回重复消费,但如果监听各自不同的队列,而且发送方也都能发送给这两个队列,消费者也消费两个不同的队列,那这个时候就各自消费各自的了
举个例子,exchange发了3个消息,按照这个事例的话,那就一个收到两条消息,一个收到三条消息
相似问题
回答 3
回答 1