5. Spring Boot Messaging

xiaoxiao2025-08-06  32

文章目录

第一章 说明第二章 编写代码2.1 启动 RabbitMQ2.2 代码2.2.1 文件结构2.2.2 创建 RabbitMQ 消息接受2.2.2 注册监听和发送消息2.2.3 测试发布消息2.2.4 运行 附录 A 参考

第一章 说明

Spring Boot Messaging

Spring框架提供了与消息传递系统集成的广泛支持,从使用JmsTemplate简化JMS API到异步接收消息的完整基础结构。Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTemplate和RabbitMQ提供了自动配置选项。Spring WebSocket本身就支持STOMP消息传递,Spring Boot通过启动器和少量的自动配置支持这一点。Spring Boot也支持Apache Kafka。

AMQP

高级消息队列协议(Advanced Message queue Protocol, AMQP)是面向消息中间件的平台无关的、线级协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括Spring-Boot-Starter-AMQP“Starter”。

RabbitMQ 扩展

RabbitMQ是一个基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。

第二章 编写代码

2.1 启动 RabbitMQ

安装 RabbitMQ 服务,以 Mac 为例

brew install rabbitmq

等待安装完成后,进入安装目录 ‘/usr/local/Cellar/rabbitmq/3.7.4/sbin’,启动

cd /usr/local/Cellar/rabbitmq/3.7.4/sbin ./rabbitmq-server

如果启动成功,将会看到

## ## ## ## RabbitMQ 3.7.4. Copyright (C) 2007-2018 Pivotal Software, Inc. ########## Licensed under the MPL. See http://www.rabbitmq.com/ ###### ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log Starting broker... completed with 6 plugins.

2.2 代码

2.2.1 文件结构

src/ +- main/ +- java/ +- com/ +- lee/ +- springbootdemo/ +- config +- RabbitMQConfig.java +- pojo/ +- Receiver.java +- Runner.java +- SpringBootDemoApplication.java +- resources/ +- <other resource>

2.2.2 创建 RabbitMQ 消息接受

src/main/java/com/lee/springbootdemo/pojo/Receiver.java

package com.lee.springbootdemo.pojo; import org.springframework.stereotype.Component; import java.util.concurrent.CountDownLatch; @Component public class Receiver { private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("Received <" + message + ">"); latch.countDown(); } public CountDownLatch getLatch() { return latch; } }

2.2.2 注册监听和发送消息

src/main/java/com/lee/springbootdemo/config/RabbitMQConfig.java

package com.lee.springbootdemo.config; import com.lee.springbootdemo.pojo.Receiver; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { public static final String topicExchangeName = "spring-boot-exchange"; public static final String queueName = "spring-boot"; @Bean public Queue queue() { return new Queue(queueName, false); } @Bean public DirectExchange exchange() { return new DirectExchange(topicExchangeName); } @Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#"); } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean public MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } }

queue() 方法创建一个 AMQP 队列

exchange() 方法创建一个 direct 交换器

binding() 方法把上面两个内容绑定在一起,定义 RabbitTemplate 发布消息的行为

Spring AMQP要求将队列、TopicExchange和绑定声明为顶级Spring bean,以便正确设置。

在这里,我们把 direct exchange 和 queue 绑定的路由键(routing key)是 foo.bar.#

2.2.3 测试发布消息

src/main/java/com/lee/springbootdemo/pojo/Runner.java

package com.lee.springbootdemo.pojo; import com.lee.springbootdemo.config.RabbitMQConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component public class Runner implements CommandLineRunner { private final RabbitTemplate rabbitTemplate; private final Receiver receiver; public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.receiver = receiver; } @Override public void run(String... args) throws Exception { System.out.println("Sending message ..."); rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!"); receiver.getLatch().await(10000, TimeUnit.MILLISECONDS); } }

这里我们定义的路由键(routing key)是foo.bar.baz

2.2.4 运行

运行 SpringBootDemoApplication.main() 方法,如果运行成功,将会看到

Sending message ... Received <Hello from RabbitMQ!>

附录 A 参考

Spring Boot Messaging 官方文档Spring Messaging 代码案例

上一篇:4. Spring Boot Caching Redis

下一篇:

转载请注明原文地址: https://www.6miu.com/read-5034387.html

最新回复(0)