Spring Boot Messaging
Spring框架提供了与消息传递系统集成的广泛支持,从使用JmsTemplate简化JMS API到异步接收消息的完整基础结构。Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTemplate和RabbitMQ提供了自动配置选项。Spring WebSocket本身就支持STOMP消息传递,Spring Boot通过启动器和少量的自动配置支持这一点。Spring Boot也支持Apache Kafka。
AMQP
高级消息队列协议(Advanced Message queue Protocol, AMQP)是面向消息中间件的平台无关的、线级协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括Spring-Boot-Starter-AMQP“Starter”。
RabbitMQ 扩展
RabbitMQ是一个基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。
安装 RabbitMQ 服务,以 Mac 为例
brew install rabbitmq等待安装完成后,进入安装目录 ‘/usr/local/Cellar/rabbitmq/3.7.4/sbin’,启动
cd /usr/local/Cellar/rabbitmq/3.7.4/sbin ./rabbitmq-server如果启动成功,将会看到
## ## ## ## RabbitMQ 3.7.4. Copyright (C) 2007-2018 Pivotal Software, Inc. ########## Licensed under the MPL. See http://www.rabbitmq.com/ ###### ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log Starting broker... completed with 6 plugins.src/main/java/com/lee/springbootdemo/pojo/Receiver.java
package com.lee.springbootdemo.pojo; import org.springframework.stereotype.Component; import java.util.concurrent.CountDownLatch; @Component public class Receiver { private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("Received <" + message + ">"); latch.countDown(); } public CountDownLatch getLatch() { return latch; } }src/main/java/com/lee/springbootdemo/config/RabbitMQConfig.java
package com.lee.springbootdemo.config; import com.lee.springbootdemo.pojo.Receiver; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { public static final String topicExchangeName = "spring-boot-exchange"; public static final String queueName = "spring-boot"; @Bean public Queue queue() { return new Queue(queueName, false); } @Bean public DirectExchange exchange() { return new DirectExchange(topicExchangeName); } @Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#"); } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean public MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } }queue() 方法创建一个 AMQP 队列
exchange() 方法创建一个 direct 交换器
binding() 方法把上面两个内容绑定在一起,定义 RabbitTemplate 发布消息的行为
Spring AMQP要求将队列、TopicExchange和绑定声明为顶级Spring bean,以便正确设置。
在这里,我们把 direct exchange 和 queue 绑定的路由键(routing key)是 foo.bar.#
src/main/java/com/lee/springbootdemo/pojo/Runner.java
package com.lee.springbootdemo.pojo; import com.lee.springbootdemo.config.RabbitMQConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component public class Runner implements CommandLineRunner { private final RabbitTemplate rabbitTemplate; private final Receiver receiver; public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.receiver = receiver; } @Override public void run(String... args) throws Exception { System.out.println("Sending message ..."); rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!"); receiver.getLatch().await(10000, TimeUnit.MILLISECONDS); } }这里我们定义的路由键(routing key)是foo.bar.baz
运行 SpringBootDemoApplication.main() 方法,如果运行成功,将会看到
Sending message ... Received <Hello from RabbitMQ!>上一篇:4. Spring Boot Caching Redis
下一篇: