RabbitMQ消息分发模式----"Topic"主题模式

xiaoxiao2021-02-28  92

前面虽然有Direct类型和Fanout的转换器。但它们仍然有一定的局限性——不能根据多重条件进行路由选择。

Topic exchange(主题转发器)

发送给主题转发器的消息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符。这些标识符可以是随意,但是通常跟消息的某些特性相关联。一些合法的路由选择键比如“socket.usd.nyse”,"nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字节。

绑定键也必须以相同的格式。主题转发器的逻辑类似于direct类型的转发器。消息通过一个特定的路由键发送到所有与绑定键匹配的队列中。需要注意的是,关于绑定键有两种特殊的情况:*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。如下图:

  在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。

我们创建3个绑定:Q1绑定键是“*.orange.*”,Q2绑定键是“*.*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而"lazy.brown.fox"则只发往第二个队列。“quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

如果我们违反约定,发送了只带1个或者4个标识符的选择键,像“orange”或者“quick.orange.male.rabbit”,会发生什么呢?这些消息都不匹配任何绑定,所以将被丢弃。

       另外,“lazy.orange.male.rabbit”,尽管有4个标识符,但是仍然匹配最后一个绑定键,所以会发送到第二个队列中。

       注:主题类型的转发器非常强大,可以实现其他类型的转发器。当队列绑定#绑定键,可以接受任何消息,类似于fanout转发器。当特殊字符*和#不包含在绑定键中,这个主题转发器就像一个direct类型的转发器。

完整例子:

发送端代码:

[java]  view plain  copy package cn.rabbitmq.topic;   import cn.rabbitmq.util.ConnectionUtil;   import com.rabbitmq.client.Channel;   import com.rabbitmq.client.Connection;   public class SendTest {       private final static String EXCHANGE_NAME = "test_exchange_topic";       public static void main(String[] argv) throws Exception {           // 获取到连接以及mq通道           Connection connection =ConnectionUtil.getConnection();           Channel channel = connection.createChannel();           // 声明exchange           channel.exchangeDeclare(EXCHANGE_NAME, "topic");           // 消息内容           String message = "Hello World!";           channel.basicPublish(EXCHANGE_NAME, "key.1.2"null, message.getBytes());           System.out.println(" [x] Sent '" + message + "'");           channel.close();           connection.close();       }   }   接收端1: [java]  view plain  copy package cn.rabbitmq.topic;   import cn.rabbitmq.util.ConnectionUtil;   import com.rabbitmq.client.Channel;   import com.rabbitmq.client.Connection;   import com.rabbitmq.client.QueueingConsumer;   public class RecvTest1 {       private final static String QUEUE_NAME = "test_queue_topic_work";       private final static String EXCHANGE_NAME = "test_exchange_topic";       public static void main(String[] argv) throws Exception {           // 获取到连接以及mq通道           Connection connection = ConnectionUtil.getConnection();           Channel channel = connection.createChannel();           // 声明队列           channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);           // 绑定队列到交换机           channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");           // 同一时刻服务器只会发一条消息给消费者           channel.basicQos(1);           // 定义队列的消费者           QueueingConsumer consumer = new QueueingConsumer(channel);           // 监听队列,手动返回完成           channel.basicConsume(QUEUE_NAME, false, consumer);           // 获取消息          while (true) {               QueueingConsumer.Delivery delivery = consumer.nextDelivery();               String message = new String(delivery.getBody());               System.out.println(" [x] Received '" + message + "'");               Thread.sleep(10);               channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);           }       }   }   接收端2: [java]  view plain  copy package cn.rabbitmq.topic;   import cn.rabbitmq.util.ConnectionUtil;   import com.rabbitmq.client.Channel;   import com.rabbitmq.client.Connection;   import com.rabbitmq.client.QueueingConsumer;   public class RecvTest2 {       private final static String QUEUE_NAME = "test_queue_topic_work2";       private final static String EXCHANGE_NAME = "test_exchange_topic";       public static void main(String[] argv) throws Exception {           // 获取到连接以及mq通道           Connection connection =ConnectionUtil.getConnection();           Channel channel = connection.createChannel();           // 声明队列           channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);           // 绑定队列到交换机           channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#");           // 同一时刻服务器只会发一条消息给消费者           channel.basicQos(1);           // 定义队列的消费者           QueueingConsumer consumer = new QueueingConsumer(channel);           // 监听队列,手动返回完成           channel.basicConsume(QUEUE_NAME, false, consumer);           // 获取消息           while (true) {               QueueingConsumer.Delivery delivery = consumer.nextDelivery();               String message = new String(delivery.getBody());               System.out.println(" [x] Received '" + message + "'");               Thread.sleep(10);               channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);           }       }   }   此时先自动生产者在启动两个消费者:可以看到消费者1接收不到消息,消费者2可以接收到消息。当把生产者的key.1.2改为key.1时可以发现两个消费者都能接收到消息。
转载请注明原文地址: https://www.6miu.com/read-17843.html

最新回复(0)