rabbitmq的简单使用(3)

xiaoxiao2021-02-28  104

本文是基于spring和rabbitmq的,部分资料来自网上,如有问题,请联系作者

1、配置文件

1.1 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sdnware</groupId> <artifactId>start04</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>start04</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.seleniumhq.selenium/selenium-java <dependency> <groupId>org.seleniumhq.selenium</groupId> <artifactId>selenium-java</artifactId> <version>3.3.1</version> </dependency> --> <!-- https://mvnrepository.com/artifact/com.github.detro.ghostdriver/phantomjsdriver <dependency> <groupId>com.github.detro.ghostdriver</groupId> <artifactId>phantomjsdriver</artifactId> <version>1.1.0</version> </dependency> --> <!-- https://mvnrepository.com/artifact/org.jboss.arquillian.extension/arquillian-phantom-driver <dependency> <groupId>org.jboss.arquillian.extension</groupId> <artifactId>arquillian-phantom-driver</artifactId> <version>1.2.1.1</version> </dependency> --> <!-- https://mvnrepository.com/artifact/commons-io/commons-io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>org.ehcache</groupId> <artifactId>ehcache</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.6.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.14.5</version> </dependency> <!-- <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.2.RELEASE</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>

1.2 spring配置文件

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd "> <rabbit:connection-factory id="connectionFactory" host="192.168.100.205" username="admin" password="sdnware" port="5672" virtual-host="/" /> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:template exchange="test-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 申明一个消息队列Queue --> <rabbit:queue id="test_queue" name="test_queue" durable="true" auto-delete="false" exclusive="false"/> <!-- 1、交换机定义 2、rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 rabbit:topic-exchange 此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'. rabbit:fanout-exchange 此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。 --> <rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="test_queue" key="test_queue_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 使用监听的方式进行消息接收 <bean id="queueListenter" class="com.sdnware.start04.rabbitmq.spring.QueueListenter"></bean> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="test_queue" ref="queueListenter" /> </rabbit:listener-container> --> </beans>

2、生产者

package com.sdnware.start04.rabbitmq.spring; public interface MQProducer { /** * 发送消息到指定队列 * @param queueKey * @param object */ public void sendDataToQueue(String queueKey, Object object); } package com.sdnware.start04.rabbitmq.spring; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; public class MQProducerImpl implements MQProducer { private static Logger LOG = LoggerFactory.getLogger(MQProducerImpl.class); private AmqpTemplate amqpTemplate; public MQProducerImpl(AmqpTemplate amqpTemplate) { this.amqpTemplate = amqpTemplate; } @Override public void sendDataToQueue(String queueKey, Object object) { try { amqpTemplate.convertAndSend(queueKey, object); } catch (Exception e) { LOG.error("ERROR", e); } } }

3、消费者

3.1 使用Listenter 的方式请自行放开spring配置文件中说明部分 3.2 使用java

package com.sdnware.start04.rabbitmq.spring; public interface MQConsumer { /** * * desc:接收消息 * author:chen.bob * time:2017年5月5日 下午2:41:41 * @param queueKey * @param object */ void recvDataToQueue(String queueKey); } package com.sdnware.start04.rabbitmq.spring; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; public class MQConsumerImpl implements MQConsumer { private static Logger LOG = LoggerFactory.getLogger(MQConsumerImpl.class); private AmqpTemplate amqpTemplate; public MQConsumerImpl(AmqpTemplate amqpTemplate) { super(); this.amqpTemplate = amqpTemplate; } @Override public void recvDataToQueue(String queueName) { try { Object receiveAndConvert = amqpTemplate.receiveAndConvert(queueName); LOG.info("接收到消息:"+receiveAndConvert); } catch (Exception e) { LOG.error("ERROR", e); } } }

4、测试

package com.sdnware.start04.rabbitmq.spring; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Run { private static ApplicationContext classPathXmlApplicationContext; private static String key = "test_queue_key"; static{ classPathXmlApplicationContext = new ClassPathXmlApplicationContext("spring-rabbitmq.xml"); } public static void main(String[] args) throws Exception { AmqpTemplate amqpTemplate = classPathXmlApplicationContext.getBean("amqpTemplate",AmqpTemplate.class); MQProducer mqProducer = new MQProducerImpl(amqpTemplate); Map<String,String> message = new HashMap<String,String>(); message.put("title", "this is a message"); mqProducer.sendDataToQueue(key, message); //以下部分,使用Listenter方式请自行注释 TimeUnit.SECONDS.sleep(5); MQConsumer mqConsumer = new MQConsumerImpl(amqpTemplate); mqConsumer.recvDataToQueue("test_queue"); } }

5、资料

RabbitMQ Exchange类型详解 http://www.cnblogs.com/julyluo/p/6265775.html

转载请注明原文地址: https://www.6miu.com/read-30078.html

最新回复(0)