在上一篇文章中,我们讲解了RabbitMQ中的AMQP协议,对RabbitMQ中的基本概念和工作流程有了大致的了解,同时也知道了RabbitMQ当中的重要组件,如果你忘记了内容,可以先复习一下,再学习接下来的内容。RabbitMQ学习(二)——AMQP协议。
好了,接下来就进入本文的主要内容RabbitMQ中的交换机(Exchange)。我们已经知道了RabbitMQ中的交换机有四种类型,分为:
Direct Exchange(直连交换机)Fanout Exchange(扇型交换机)Topic Exchange(主题交换机)Headers Exchange(头交换机)我们知道在RabbitMQ工作流程当中,当我们要使用时需要指定Exchange和Queue才能传递消息,接下来我们便介绍一下各个交换机的特点,然后通过建立一个简单的SpringBoot例子来讲解各个交换机类型的用法,在开始例子前,我们先来说一说各个交换机的特点。
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。
直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。
下面介绍它是如何工作的: 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key) 当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。同样的一个routing key也是支持应用到多个队列中的。 直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
它的工作原理图如下:
生产者发送一个routing_key为key2的消息,到达直连交换机后,交换机会把消息投递给绑定routing_key也是key2的队列,然后再投递给监听该队列的用户。
这里的匹配就是完全匹配,这个模式就是直连交换机。
扇型交换机(fanout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。 如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时, 交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。所以扇形交换机主要做的就是广播消息。
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以它的应用案例都极其相似:
大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端分发系统使用它来广播各种状态和配置更新在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)它的工作原理图如下:
生产者发送一个消息给扇形交换机,扇形交换机会把消息投递给所有绑定了它的队列,而这个与routing_key(路由键)无关,它采用广播机制传递消息。
主题交换机(topic exchange)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。 主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
主题交换机非常灵活,它是通过使用带规则的routing_key来对实现对消费分配到队列中,可以实现一个消息发送给一个或多个队列中。
主题交换机的routing_key需要有一定的规则,采用.#.…..的格式,每个部分用.分开,其中:
*(星号):可以(只能)匹配一个单词#(井号):可以匹配多个单词(或者零个)假设有一条消息的routing_key为net.rabbit.kk,那么带有这样binding_key(和交换机绑定的routing_key)的几个队列都会接收这条消息:
1、net.*.* 2、*.*.kk 3、net.#
当然还有其他的组合,这里只是列举了一些。其实就和正则的匹配一样,只要满足就可以传递到该队列中去。
主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的多消费者/多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。
使用案例:
分发有关于特定地理位置的数据,例如销售点由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务股票价格更新(以及其他类型的金融数据更新)涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)云端的不同种类服务的协调分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。它的工作原理图如下:
生产者发送一个routing_key为“uu.kk.kk”的消息,到主题连交换机后,交换机会根据routing_key的匹配规则(就是上面介绍的#、*的规则),把消息投递给绑定routing_key且符合规则的队列,然后再投递给监听该队列的用户。
这里的匹配就是正则匹配模式,这个模式就是主题交换机。
有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。 头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。 在头交换机中需要考虑的是需要部分匹配还是全部匹配。相比较于直达交换机,头交换机的优势是匹配的规则不被限定为字符串,头交换机需要在队列绑定的规则中指定消息头和匹配的规则。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息x-match = any :表示只要有键值对匹配就能接受到消息就是在指定消息头(键值对)时,添加”x-match”参数,当”x-match”设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当”x-match”设置为“all”的时候,就需要消息头的所有值都匹配成功。
头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。
它的工作原理图如下:
生产者发送一个含有消息头为 {“key2”:“value2”} 的消息,到头交换机后,交换机会根据队列绑定消息头中“x-match”的匹配规则(就是上面介绍的all、any的规则),把消息投递给满足消息头匹配规则的队列中,然后再投递给监听该队列的用户。
这里的匹配就是消息头(键值对)模式,这个模式就是头交换机。
到此,RabbitMQ交换机的就介绍完毕,这里总结一下:
直达交换机:先匹配,再投递,即在绑定时设定一个routing_key,消息的routing_key完全匹配时,才会被交换机投递到绑定的队列中。 扇形交换机:与routing_key无关,把所有消息投递给所有绑定的队列中。 主题交换机:绑定routing_key,在匹配routing_key时按照正则配置的规则投递消息到队列中,这个也是最灵活的交换机。 头交换机:使用多个消息属性来替代路由键建立路由规则,可以实现部分匹配或全部匹配。
注意:这里一个routing_key可以绑定到多个队列中,也就是说一个队列可以绑定多个不同的routing_key,这个可以根据具体实际需求来设计。
上面我们对RabbitMQ的队列类型进行了大致的介绍,接下来我们便开始结合代码来分析:
这里我使用Eclipse,新建一个名为rabbitmq的maven工程。这里spring-boot使用1.4.1.RELEASE版本。在pom文件中引入依赖包,
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies>在项目下新建resources文件夹,新建文件application.properties,然后配置以下代码,详细的意思可以查看注释。
# 应用名称 spring.application.name=rabbitmq # RabbitMQ主机地址 spring.rabbitmq.host=localhost # RabbitMQ端口号 spring.rabbitmq.port=5672 # RabbitMQ用户名 spring.rabbitmq.username=guest # RabbitMQ密码 spring.rabbitmq.password=guest # 支持发布确认 spring.rabbitmq.publisher-confirms=true # 支持发布返回 spring.rabbitmq.publisher-returns=true # 虚拟主机名称 spring.rabbitmq.virtual-host=/ # 采用手动应答 # spring.rabbitmq.listener.acknowledge-mode=manual然后新建controller、config、sender、receiver包名,新建文件如下所示:
这里我们在配置文件中新建hello、user队列,RabbitConfig.java的代码如下:
package net.anumbrella.rabbitmq.config; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.publisher-confirms}") private boolean publisherConfirms; /** * 申明hello队列 * * @return */ @Bean public Queue helloQueue() { return new Queue("hello"); } /** * 申明user队列 * * @return */ @Bean public Queue userQueue() { return new Queue("user"); } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses + ":" + port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); /** 如果要进行消息回调,则这里必须要设置为true */ connectionFactory.setPublisherConfirms(publisherConfirms); return connectionFactory; } /** * 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 * * @return */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplatenew() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }在sender包下,新建HelloReceiver1.java,HelloReceiver2.java,代码如下所示:
HelloReceiver1.java:
package net.anumbrella.rabbitmq.receiver; import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class HelloReceiver1 { @RabbitListener(queues = "hello") public void process(Message message, Channel channel) throws IOException { System.out.println("Receiver1 : " + new String(message.getBody())); } }HelloReceiver2.java:
package net.anumbrella.rabbitmq.receiver; import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class HelloReceiver2 { @RabbitListener(queues = "hello") public void process(Message message, Channel channel) throws IOException { System.out.println("Receiver2 : " + new String(message.getBody())); } }这里@RabbitListener就是监听某个队列,如果该队列中有消息时,就会发送给用户。
同理发送者,在sender包名下建立HelloSender1.java、HelloSender2.java,代码如下:
HelloSender1.java:
package net.anumbrella.rabbitmq.sender; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HelloSender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg) { SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String sendMsg = msg + time.format(new Date()) + " hello1 "; System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate.convertAndSend("hello", sendMsg); } }HelloSender2.java:
package net.anumbrella.rabbitmq.sender; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HelloSender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg) { SimpleDateFormat time=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String sendMsg = msg + time.format(new Date()) + " hello2 "; System.out.println("Sender2 : " + sendMsg); this.rabbitTemplate.convertAndSend("hello", sendMsg); } }注意:这里我们把消息的发送者和消息的消费者写在了一个程序当中了,在实际的情况中可能会在不同的程序当中。
接下来我们在controller包下,新建RabbitTest.java,通过Restful接口来模拟触发消息的发送。
RabbitTest.java:
package net.anumbrella.rabbitmq.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import net.anumbrella.rabbitmq.sender.HelloSender1; import net.anumbrella.rabbitmq.sender.HelloSender2; @RestController @RequestMapping("/rabbit") public class RabbitTest { @Autowired private HelloSender1 helloSender1; @Autowired private HelloSender2 helloSender2; /** * 单生产者-单消费者 */ @GetMapping("/hello") public void hello() { helloSender1.send("hello1"); } /** * 单生产者-多消费者 */ @GetMapping("/oneToMany") public void oneToMany() { for (int i = 0; i < 4; i++) { helloSender1.send("第[" + (i + 1) + "]个 ---------> "); } } /** * 多生产者-多消费者 */ @GetMapping("/manyToMany") public void manyToMany() { for (int i = 0; i < 4; i++) { helloSender1.send("第[" + (i + 1) + "]个 ---------> "); helloSender2.send("第[" + (i + 1) + "]个 ---------> "); } } }这里我们主要模拟了三种情况:
单生产者-单消费者单生产者-多消费者多生产者-多消费者更改App.java为spring-boot启动类,运行程序,通过访问http://localhost:8080/rabbit/hello,http://localhost:8080/rabbit/oneToMany,http://localhost:8080/rabbit/manyToMany来模拟上面的三种情况。
当我们运行起程序后,我们可以在RabbitMQ服务端管理界面查看到新建的连接和新生成的队列:
然后我们在浏览器里访问hello、oneToMany、manyToMany可以分别得到以下消息: 可以看到用户发送了消息并且收到了消息,这里一个用户发送并且一个消费者收到了。
Sender1 : hello1 2018-05-12 16:56:35 hello1 Receiver1 : hello1 2018-05-12 16:56:35 hello1这里我们一个用户发送消息,并且多个消费者收到了消息。
Sender1 : 第[1]个 ---------> 2018-05-12 17:05:48 hello1 Sender1 : 第[2]个 ---------> 2018-05-12 17:05:49 hello1 Sender1 : 第[3]个 ---------> 2018-05-12 17:05:49 hello1 Sender1 : 第[4]个 ---------> 2018-05-12 17:05:49 hello1 Receiver1 : 第[1]个 ---------> 2018-05-12 17:05:48 hello1 Receiver2 : 第[2]个 ---------> 2018-05-12 17:05:49 hello1 Receiver2 : 第[3]个 ---------> 2018-05-12 17:05:49 hello1 Receiver1 : 第[4]个 ---------> 2018-05-12 17:05:49 hello1现在这里是多个用户发送消息,并且多个用户收到了消息。
Sender1 : 第[1]个 ---------> 2018-05-12 17:06:21 hello1 Sender2 : 第[1]个 ---------> 2018-05-12 17:06:21 hello2 Sender1 : 第[2]个 ---------> 2018-05-12 17:06:21 hello1 Sender2 : 第[2]个 ---------> 2018-05-12 17:06:21 hello2 Sender1 : 第[3]个 ---------> 2018-05-12 17:06:21 hello1 Sender2 : 第[3]个 ---------> 2018-05-12 17:06:21 hello2 Sender1 : 第[4]个 ---------> 2018-05-12 17:06:21 hello1 Sender2 : 第[4]个 ---------> 2018-05-12 17:06:21 hello2 Receiver1 : 第[1]个 ---------> 2018-05-12 17:06:21 hello1 Receiver2 : 第[1]个 ---------> 2018-05-12 17:06:21 hello2 Receiver1 : 第[2]个 ---------> 2018-05-12 17:06:21 hello1 Receiver2 : 第[2]个 ---------> 2018-05-12 17:06:21 hello2 Receiver1 : 第[3]个 ---------> 2018-05-12 17:06:21 hello1 Receiver2 : 第[3]个 ---------> 2018-05-12 17:06:21 hello2 Receiver1 : 第[4]个 ---------> 2018-05-12 17:06:21 hello1 Receiver2 : 第[4]个 ---------> 2018-05-12 17:06:21 hello2到这里你可能开始有些疑惑了,我们在模拟发送单生产者-单消费者时,你可能会想为啥是receiver1收到了消息,而receiver2没有消息,为啥是receiver1而不是receiver2? 其实当我们再次模拟访问hello时,你就会发现现在是receiver2收到消息了,再访问又是receiver1收到消息了。 多次访问hello结果如下:
Sender1 : hello1 2018-05-12 17:10:01 hello1 Receiver2 : hello1 2018-05-12 17:10:01 hello1 Sender1 : hello1 2018-05-12 17:10:06 hello1 Receiver1 : hello1 2018-05-12 17:10:06 hello1 Sender1 : hello1 2018-05-12 17:10:09 hello1 Receiver2 : hello1 2018-05-12 17:10:09 hello1 Sender1 : hello1 2018-05-12 17:10:19 hello1 Receiver1 : hello1 2018-05-12 17:10:19 hello1这里我们仔细研究就会发现oneToMany就是这种情况,这是为啥?
这里就要讲到RabbitMQ的消息机制了,RabbitMQ默认的消息机制其实是轮询分发,只是spring-boot中结合的RabbitMQ封装的,采用了公平分发机制。所以在这里采用的消息机制是公平分发。
轮询分发是指每个消费者都会受到服务发送来的消息,按照依次的顺序来处理而不管消费者是否有能力处理。 公平分发则是按消费者的处理能力来进行处理,通过设置prefetchCount参数来让用户可以根据能力处理消息。
至于这里为什么是公平分发和关于消息处理机制处理的特点后面会有文章进行介绍,这里大致了解即可。
即这里可以设置让每个消费者不是随机获得,可以让能者多得。
我们也知道每次发送消息都要指定交换机,为啥这里没有声明交换机,也没有使用routing_key,消息也走通了?
这是因为RabbitMQ还有一个默认交换机:
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。 它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
举个栗子:当你声明了一个名为”hello”的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为”hello”。因此,当携带着名为”hello”的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为”hello”的队列中。
换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。
所以我们发送消息时routing_key直接为队列名称了。这里就是使用了默认的直达交换机。
接下来我们再来使用主题交换机、扇形交换机和头交换机: 在RabbitConfig.java 中新增主题交换机、扇形交换机、头交换机和相关队列、路由键的声明、绑定规则。
新增代码如下:
// ===============以下是验证topic Exchange的队列和交互机========== @Bean public Queue queueMessage() { return new Queue("topic.message"); } @Bean public Queue queueMessages() { return new Queue("topic.messages"); } @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) { return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message"); } /** * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配 * * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) { return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.#"); } // ===============以上是验证topic Exchange的队列和交互机========== // ===============以下是验证Fanout Exchange的队列和交互机========== @Bean public Queue fanoutQueueA() { return new Queue("fanout.A"); } @Bean public Queue fanoutQueueB() { return new Queue("fanout.B"); } @Bean public Queue fanoutQueueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue fanoutQueueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue fanoutQueueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue fanoutQueueC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueueC).to(fanoutExchange); } // ===============以上是验证Fanout Exchange的队列和交互机========== // ===============以下是验证Direct Exchange的队列和交互机========== @Bean public Queue directQueueA() { return new Queue("direct.A"); } @Bean public Queue directQueueB() { return new Queue("direct.B"); } @Bean public Queue directQueueC() { return new Queue("direct.C"); } @Bean DirectExchange directExchange() { return new DirectExchange("directExchange"); } @Bean Binding bindingDirectExchangeA(Queue directQueueA, DirectExchange directExchange) { return BindingBuilder.bind(directQueueA).to(directExchange).with("direct.a"); } @Bean Binding bindingDirectExchangeB(Queue directQueueB, DirectExchange directExchange) { return BindingBuilder.bind(directQueueB).to(directExchange).with("direct.b"); } @Bean Binding bindingDirectExchangeC(Queue directQueueC, DirectExchange directExchange) { return BindingBuilder.bind(directQueueC).to(directExchange).with("direct.c"); } // ===============以上是验证Direct Exchange的队列和交互机========== // ===============以下是验证Headers Exchange的队列和交互机========== @Bean public Queue headersQueueA() { return new Queue("headers.A"); } @Bean HeadersExchange headersExchange() { return new HeadersExchange("headersExchange"); } @Bean Binding bindingHeadersExchangeA(Queue headersQueueA, HeadersExchange headersExchange) { // 这里x-match有两种类型 // all:表示所有的键值对都匹配才能接受到消息 // any:表示只要有键值对匹配就能接受到消息 return BindingBuilder.bind(headersQueueA).to(headersExchange).where("age").exists(); } // ===============以上是验证Headers Exchange的队列和交互机==========然后再在sender和receiver中添加相应的发送者和接受者: TopicSender.java、FanoutSender.java、DirectSender.java 、HeadersSender.java TopicReceiver.java、FanoutReceiver.java、DirectReceiver.java 、HeadersReceiver.java
TopicSender.java:
package net.anumbrella.rabbitmq.sender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String msg1 = "I am topic.mesaage msg======"; System.out.println("sender1 : " + msg1); this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", msg1); String msg2 = "I am topic.mesaages msg########"; System.out.println("sender2 : " + msg2); this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", msg2); } }TopicReceiver.java:
package net.anumbrella.rabbitmq.receiver; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TopicReceiver { @RabbitListener(queues = "topic.message") public void processA(String msg) { System.out.println("topicMessageReceiver : " +msg); } @RabbitListener(queues = "topic.messages") public void processB(String msg) { System.out.println("topicMessagesReceiver : " +msg); } }FanoutSender.java:
package net.anumbrella.rabbitmq.sender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String msgString="fanoutSender : hello i am anumbrella"; System.out.println(msgString); this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msgString); } }FanoutReceiver.java:
package net.anumbrella.rabbitmq.receiver; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutReceiver { @RabbitListener(queues = "fanout.A") public void processA(String msg) { System.out.println("FanoutReceiverA : " + msg); } @RabbitListener(queues = "fanout.B") public void processB(String msg) { System.out.println("FanoutReceiverB : " + msg); } @RabbitListener(queues = "fanout.C") public void processC(String msg) { System.out.println("FanoutReceiverC : " + msg); } }DirectSender.java :
package net.anumbrella.rabbitmq.sender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class DirectSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String msg1 = "I am direct.a msg ======="; System.out.println("sender1 : " + msg1); this.rabbitTemplate.convertAndSend("directExchange", "direct.a", msg1); String msg2 = "I am direct.b msg ======="; System.out.println("sender2 : " + msg2); this.rabbitTemplate.convertAndSend("directExchange", "direct.b", msg2); String msg3 = "I am direct.c msg ======="; System.out.println("sender3 : " + msg3); this.rabbitTemplate.convertAndSend("directExchange", "direct.c", msg3); } }DirectReceiver.java :
package net.anumbrella.rabbitmq.receiver; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DirectReceiver { @RabbitListener(queues = "direct.A") public void processA(String msg) { System.out.println("DirectReceiverA : " + msg); } @RabbitListener(queues = "direct.B") public void processB(String msg) { System.out.println("DirectReceiverB : " + msg); } @RabbitListener(queues = "direct.C") public void processC(String msg) { System.out.println("DirectReceiverB : " + msg); } }HeadersSender.java:
package net.anumbrella.rabbitmq.sender; import java.util.Hashtable; import java.util.Map; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.MessagePropertiesBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HeadersSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { Map<String, Object> headers = new Hashtable<String, Object>(); headers.put("name", "jack"); headers.put("age", 30); String content = headers.toString(); MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setMessageId("123").setHeader("age", "30") .build(); Message message = MessageBuilder.withBody(content.getBytes()).andProperties(props).build(); System.out.println("sender1 : " + headers.toString()); this.rabbitTemplate.convertAndSend("headersExchange", "", message); } }HeadersReceiver.java :
package net.anumbrella.rabbitmq.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "headers.A") public class HeadersReceiver { @RabbitHandler public void process(String msg) { System.out.println("HeadersReceiverA : " + msg); } }然后我们在controller层添加访问接口代码,如下:
@Autowired private TopicSender topicSender; @Autowired private FanoutSender fanoutSender; @Autowired private DirectSender directSender; @Autowired private HeadersSender headersSender; /** * topic exchange类型rabbitmq测试 */ @GetMapping("/topicTest") public void topicTest() { topicSender.send(); } /** * fanout exchange类型rabbitmq测试 */ @GetMapping("/fanoutTest") public void fanoutTest() { fanoutSender.send(); } /** * direct exchange类型rabbitmq测试 */ @GetMapping("/directTest") public void directTest() { directSender.send(); } /** * headers exchange类型rabbitmq测试 */ @GetMapping("/headersTest") public void headersTest() { headersSender.send(); }我们分别进行topicTest、fanoutTest、directTest、headersTest访问,可以得到如下结果:
跟我们想的一样,队列top.message完全匹配,只有一个消息接收到,而队列topic.messages采用#匹配,会收到两条消息。完全符合topic模式。
当我们更改发送消息的routing_key时,发现结果是一样的。这样刚刚符合扇形交换机的特点,与routing_key无法,会把消息发送给所有绑定它的队列。
这里我们采用了自定义的直达交换机,没有采用默认的交换机,所以需要指定routing_key与绑定的要一致,不然队列是无法获得消息的。这也符合直达交换机完全匹配的特点。
头部交换机,这里绑定的头参数(键值对)采用“x-match”的“any”模式,所以只要满足含义age属性就可以传递消息到队列中。这里的匹配模式按spring-boot的形式进行了封装,只要含age就匹配。符合头交换机的匹配模式。
到此我们把四种交换机的基本用法介绍完了,除此之外,传递的消息还可以是实体类,继承序列化,在监听处接受实体类即可。 比如发送User实体类,该类采用序列化。在Sender中:
User user = new User(); user.setName("anumbrella"); user.setAddress("China"); System.out.println("user send : " + user.getName() + "/" + user.getAddress()); this.rabbitTemplate.convertAndSend("user", user);在Receiver中:
@RabbitListener(queues = "user") public void process(User user) { System.out.println("user receive : " + user.getName() + " / " + user.getAddress()); }除了发送实体类外,在前面RabbitMQ第二篇文章中,我们还介绍了RabbitMQ中的消息确认机制,在前面的例子当中,我们都是采用的自动确认机制,即是代码中自动返回了消息的确认。我们还可以采用手动确认消息。
在resources中把下面的代码注释打开,采用手动确认消息机制。
# 采用手动应答 spring.rabbitmq.listener.acknowledge-mode=manual然后我们在接收消息中,即Receiver中如下设置:
@RabbitListener(queues = "hello") public void process(Message message, Channel channel) throws IOException { System.out.println("CheckReceiver: " + new String(message.getBody())); try { doWork(); } catch (InterruptedException e) { e.printStackTrace(); } // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }注意:采用了手动确认消息后,所有消费者都需要手动确认消息是否收到完毕。不然RabbitMQ会认为消息投递失败,反复投递。
当然除了消息确认外,还有消息拒绝,当我们拒绝某个消息时,让RabbitMQ会把消息传递给它的下一个消费者接受该消息,直到该消息把确认收到为止,也可以让RabbitMQ把消息给删除掉。
这里的使用和消息确认基本一致,传递不同的参数采用不同的操作。
// true 发送给下一个消费者 // false 谁都不接受,从队列中删除 // 拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);除了上面的消息确认、拒绝外,RabbitMQ还带消息的回调确认,用户是否收到消息,发送者的消息是否成功投递,可以通过Callback中的确认来实现。 通过在Sender中实现RabbitTemplate.ConfirmCallback接口来实现该操作,如下:
@Component public class CallBackSender implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplatenew; public void send() { rabbitTemplatenew.setConfirmCallback(this); String msg = "callbackSender : i am callback sender"; System.out.println(msg); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); System.out.println("callbackSender UUID: " + correlationData.getId()); this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData); } public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("callbakck confirm: " + correlationData.getId() + " ACK : " + ack); } }通过ACK的返回值,我们可以确认用户是否消费掉该消息没有,然后做后续的操作。 这里具体的细节我就没有详细列出来,我放在下面的demo代码里面,读者可以去代码里面看详细的用法。
到这里RabbitMQ的交换机用法实例就介绍完毕了,上面的demo我放在github了,大家可以去结合实际代码来学习。地址rabbitmq-demo。