老师,为什么 user.* 和 user.# 均能收到 user.delete.abc 这条消息。源码来自于老师提供的案例。

来源:2-8 Rabbitmq核心API-Exchange之Topic

guaguaerhao

2021-08-22 11:25:56

问题描述:

为什么 user.* 和 user.# 均能收到 user.delete.abc 这条消息


操作:分别单独启动某一个消费者,再启动消息提供者



相关截图:

1.只启动 user.* 的消费者类,可以接收到 user.save、user.update、user.delete.abc 这些消息

http://img.mukewang.com/climg/6121c22d098b447719720438.jpg


2.只启动 user.# 的消费者类,可以接收到 user.save、user.update、user.delete.abc 这些消息

http://img.mukewang.com/climg/6121c19009457c6218340402.jpg



相关代码:(消费者2 的代码改了好几遍,就不代码缩减没了,不知道为啥)

// 消费者1,routingKey 为 user.* Receiver4TopicExchange1.java
public class Receiver4TopicExchange1 {
   public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("121.4.70.250");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
      //4 声明
      String exchangeName = "test_topic_exchange";
      String exchangeType = "topic";
      String queueName = "test_topic_queue";
      String routingKey = "user.*";
      channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
      channel.queueDeclare(queueName, false, false, false, null);
      channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        System.err.println("routingKey: " +routingKey + ", consumer1 start.. ");
        // 循环获取消息
        while(true){
            // 获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
        }
   }
}



// 消费者2,routingKey 为 user.# Receiver4TopicExchange2.java
public class Receiver4TopicExchange2 {
   public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("121.4.70.250");
        connectionFactory.setPort(5672);
      connectionFactory.setVirtualHost("/");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
      //4 声明
      String exchangeName = "test_topic_exchange";
      String exchangeType = "topic";
      String queueName = "test_topic_queue";
      String routingKey = "user.#";
      channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
      channel.queueDeclare(queueName, false, false, false, null);
      channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        System.err.println("routingKey: " +routingKey + ", consumer2 start.. ");
        // 循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
        }
   }
}



// 消息提供者,发送了三条消息,routingKey 分别为
// user.update
// user.save
// user.delete.abc
public class Sender4TopicExchange {
   public static void main(String[] args) throws Exception {
      //1 创建ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      connectionFactory.setHost("121.4.70.250");
      connectionFactory.setPort(5672);
      connectionFactory.setVirtualHost("/");
      //2 创建Connection
      Connection connection = connectionFactory.newConnection();
      //3 创建Channel
      Channel channel = connection.createChannel();
      //4 声明
      String exchangeName = "test_topic_exchange";
      String routingKey1 = "user.save";
      String routingKey2 = "user.update";
      String routingKey3 = "user.delete.abc";
      //5 发送
      String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
      channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
      channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
      channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
      channel.close();
        connection.close();
   }
}



写回答

2回答

阿神

2021-08-23

因为你一个topic绑定了两个规则,你可以在控制台删除user.# 再试试能不能消费user.xxx.xxx
就不行了哦

0

qq_范特西_8

2023-03-05

就是你同一个队列,绑定了两个规则,所以这两个消费者无论谁都可以随机消费,但如果两个规则对应两个不同的队列,两个消费者也消费各自的队列,这时候就按匹配规则来了,

另外,如果两个消费者监听同一个队列,那么他俩会均摊消息不回重复消费,但如果监听各自不同的队列,而且发送方也都能发送给这两个队列,消费者也消费两个不同的队列,那这个时候就各自消费各自的了

举个例子,exchange发了3个消息,按照这个事例的话,那就一个收到两条消息,一个收到三条消息

0

Java架构师-技术专家

千万级电商项目从0到100全过程,覆盖Java程序员不同成长阶段的核心问题与解决方案

2672 学习 · 5839 问题

查看课程