八.spring+rabbitmq

xiaoxiao2021-02-28  76

一.spring+rabbitmq使用main方法集成 1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tiglle</groupId> <artifactId>spring-rabbitmq-main</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- spring和mq集成包 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency> <!-- logback日志 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency> </dependencies> </project>

2.Producer.java:

package com.rabbit.producer.main; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; public class Producer { private static Logger logger = LoggerFactory.getLogger(Producer.class); public static void main(String[] args) { //获取一个连接工厂,用户默认是guest/guest(只能使用部署在本机的RabbitMQ) //是Spring实现的对com.rabbitmq.client.Connection的包装 ConnectionFactory cf = new CachingConnectionFactory("localhost"); //对AMQP 0-9-1的实现 RabbitAdmin admin = new RabbitAdmin(cf); //声明一个队列 Queue queue = new Queue("myQueue"); admin.declareQueue(queue); //声明一个exchange类型为topic TopicExchange exchange = new TopicExchange("myExchange"); admin.declareExchange(exchange); //绑定队列到exchange,并指定routingKey为foo.* admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*")); //发送模版,设置上连接工厂 RabbitTemplate template = new RabbitTemplate(cf); //发送消息 /** * 1.String exchange:exchange的名称 * 2.String routingKey:routingKey的名称 * 3.Object message:要像exchange发送的消息 */ template.convertAndSend("myExchange", "foo.bar", "Hello Tiglle"); logger.info("Produce发送消息到"+exchange.getName()+"的exchange上," + "queueName="+queue.getName()+",routingKey=foo.*"); } }

3.Consumer.java:

package com.rabbit.comsumer.main; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; public class Comsumer { public static void main(String[] args) { //获取一个连接工厂,用户默认是guest/guest(只能使用部署在本机的RabbitMQ) //是Spring实现的对com.rabbitmq.client.Connection的包装 ConnectionFactory cf = new CachingConnectionFactory("localhost"); //监听容器 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf); //监听者对象 Object listener = new Object() { @SuppressWarnings("unused") public void handleMessage(String foo) { System.out.println(foo); } }; //通过这个适配器代理listener MessageListenerAdapter adapter = new MessageListenerAdapter(listener); //把适配器(listener)设置给Container container.setMessageListener(adapter); //设置该容器监听的队列名,可以传多个,public void setQueueNames(String... queueName) container.setQueueNames("myQueue"); //开始监听 container.start(); } }

启动Procuder和Consumer可以成功发送接收消息

二.通过配置文件配置 1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tiglle</groupId> <artifactId>rabbitmq-spring</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <!-- spring包 --> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.7.RELEASE</version> </dependency> <!-- spring和mq集成包 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency> <!-- logback日志 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency> </dependencies> </project>

2.spring+rabbitmq配置文件:applicationContext-rabbit.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd "> <!-- spring配置 --> <!-- 开启注解驱动 --> <context:annotation-config/> <!-- 扫包 --> <context:component-scan base-package="com.rabbit"/> <!-- rabbit的配置 --> <!-- 配置rabbit的ConnectionFactory --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="guest" password="guest" port="5672" /> <!-- 等同new一个RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 声明一个队列 --> <rabbit:queue name="myQueue" /> <!-- 声明一个topic类型的exchange,并把上面声明的队列绑定在上面,给routingKey="core.*" --> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="core.*" /> <!-- 还可以绑定其他列队... --> </rabbit:bindings> </rabbit:topic-exchange> <!-- 声明一个rabbitTemplate,指定连接信息,发送消息到myExchange上,routingKey在程序中设置,此处的配置在程序中可以用set修改 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange"/> <!-- 配置监听容器,指定消息处理类,监听在Spring环境启动后就自动开始了 --> <rabbit:listener-container connection-factory="connectionFactory"> <!-- ref=注册进spring容器的类的id,method=此类中的哪个方法来处理,queue-names=上面配置的queueName --> <rabbit:listener ref="consumer" method="consumerMessage" queue-names="myQueue"/> <!-- 可以注册其他监听... --> </rabbit:listener-container> </beans>

3.消费者,通过注解注入spring容器中的:Consumer.java

package com.rabbit.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; //注入spring容器 @Component public class Consumer {//配置文件raf的类 Logger logger = LoggerFactory.getLogger(Consumer.class); //配置文件指定的消息处理的方法 public void consumerMessage(String message){ logger.info("接收的消息为:"+message); } }

4.测试启动spring并发送消息的Mian方法:ProducerMain.java

package com.rabbit.main; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.rabbit.consumer.Consumer; public class ProducerMain { static Logger logger = LoggerFactory.getLogger(ProducerMain.class); //发送者 public static void main(String[] args) throws InterruptedException { //启动spring容器,启动后消费者就会一直监听 AbstractApplicationContext beans = new ClassPathXmlApplicationContext("applicationContext.xml"); //假装是Autowrited的 //@Autowrited RabbitTemplate rabbitTemplate = beans.getBean(RabbitTemplate.class); //设置routingKey rabbitTemplate.setRoutingKey("core.info"); //发送,exchange,routingKey都在配置文件中配置好了 rabbitTemplate.convertAndSend("hellow tiglle"); logger.info("发送的消息为:hellow tiglle"); //关闭掉spring容器 Thread.sleep(1000); beans.destroy(); } }
转载请注明原文地址: https://www.6miu.com/read-46193.html

最新回复(0)