topic exchange对routingKey是有要求的,必须是一个关键字的列表才能发挥正常作用,用“.”分割每个关键字,你可以定义任意的层级,唯一的限制是最大长度为255字节。 topic exchange的通配符使用是在绑定的时候使用的,发消息的时候不起作用。 topic的两个关键字: 1.“”星号,代表一个词,比如上述规则:.error 表示所有系统的error级别的日志 例:one.* 匹配如下: one.two one.three one.four 不匹配: one.two.a one.two.three.four 2.“#”井号,代表零个或多个词,比如上述规则: .# 表示所有系统的所有消息,与单独一个#是等效的,core.# 表示核心系统的所有日志,它和 core. 的区别是,即使以后规则改为 <系统>.<日志级别>.<其他条件>.<其他条件>.……,core.# 依然可以完成匹配,而 core.* 则无法匹配 core.info.xxx.xxx 例:one.* 匹配: one one.two one.two.three 不匹配: two two.one…
消费者和提供者的pom.xml和上一章一样
一.提供者Producer 1.Exchange为topic类型的消息发送者:LogSenderTopic.java
package com.rabbit.exchange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class LogSenderTopic { private Logger logger = LoggerFactory.getLogger(LogSenderFanout.class); //ConnectionFactory和Connection在正式开发时需要设置成单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogSenderTopic(){ super(); try { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connection = connectionFactory.newConnection(); channel = connection.createChannel(); } catch (Exception e) { logger.error("获取连接时出错..."); } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接时异常..."); return false; } return true; } /** * 发送消息到交换中心 */ public void sendMessage(String message,String routingKey){ try { //声明一个exchange,名字为logs,类型为fanout channel.exchangeDeclare("logs", "topic"); //发布消息到exchange上 /** * 1.指定exchange的名字 * 2.direct类型 * 3.null... * 3.发送的消息 */ channel.basicPublish("logs", routingKey, null, message.getBytes()); logger.debug("发送direct类型的消息"+message+"到exchange交换中心."); } catch (Exception e) { logger.error("消息发送失败:"+e); } } }测试main方法:ExchangeTopicMain.java
package com.rabbit.main; import com.rabbit.exchange.LogSenderTopic; public class ExchangeTopicMain { public static void main(String[] args) throws InterruptedException { LogSenderTopic logSender = new LogSenderTopic(); //轮流每一秒发送info和error的消息(让消费者1接受info和error级别消息,消费者2只接受info级别消息) while (true) { //匹配结果: //core.*:可以 core.#:可以 //logSender.sendMessage("hello tiglle:core.info","core.info"); //core.*:不行 core.#:可以 logSender.sendMessage("hello tiglle:core.member.info","core.member.info"); //注:生产者不能使用通配符,生产者的通配符定义会被当成普通字符文本 //logSender.sendMessage("hello tiglle:core.#","core.#");//core.#将会被解析成普通文本 Thread.sleep(1000); } } }二.消费者1(使用#多个单词匹配) 1.接收消息的类LogReceiveTopic.java
package com.rabbit.exchange; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class LogReceiveTopic { private Logger logger = LoggerFactory.getLogger(LogReceiveTopic.class); //正式开发ConnectionFactory和Connection应该设置为单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogReceiveTopic(){ super();//Objece为其父类... try { connectionFactory = new ConnectionFactory(); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明) channel.exchangeDeclare("logs", "topic"); } catch (Exception e) { // TODO: handle exception } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接异常:"+e); return false; } return true; } /** * 消费消息 */ public void messageReceive(String routingKey){ try { //获取临时列队:自己声明队列是比较麻烦的, //因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁 String queueName = channel.queueDeclare().getQueue(); //把获取的临时列队绑定到logs这个exchange交换中心,根据通配符绑定:#匹配n个字,*匹配多个字 /** * 1.列队名称 * 2.交换中心的名称 * 3.routingKey和生产者发布消息的时候指定的一样 */ channel.queueBind(queueName, "logs", routingKey); //解决匿名内部类不能使用非final类型 final String tempRoutingKey = routingKey; //定义一个Consumer消费logs的消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body,"UTF-8"); logger.debug("我是写硬盘的消费者:"+message+",routingKey为:"+tempRoutingKey); } }; //自动确认为true,接收到消息后该消息就销毁了 channel.basicConsume(queueName, true, consumer); } catch (Exception e) { logger.error("消费消息时异常:"+e); } } }测试启动的main方法:ExchangeTopicMain.java
package com.rabbit.main; import com.rabbit.exchange.LogReceiveTopic; public class ExchangeTopicMain { public static void main(String[] args) { LogReceiveTopic logReceive = new LogReceiveTopic(); //#匹配n个词词词词词词(core.info,core.error,core.member.info,...) logReceive.messageReceive("core.#"); } }三.消费者2(使用*单个单词匹配) 1.接收消息的类:LogReceiveTopic.java
package com.rabbit.exchange; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class LogReceiveTopic { private Logger logger = LoggerFactory.getLogger(LogReceiveTopic.class); //正式开发ConnectionFactory和Connection应该设置为单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogReceiveTopic(){ super();//Objece为其父类... try { connectionFactory = new ConnectionFactory(); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明) channel.exchangeDeclare("logs", "topic"); } catch (Exception e) { // TODO: handle exception } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接异常:"+e); return false; } return true; } /** * 消费消息 */ public void messageReceive(String routingKey){ try { //获取临时列队:自己声明队列是比较麻烦的, //因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁 String queueName = channel.queueDeclare().getQueue(); //把获取的临时列队绑定到logs这个exchange交换中心,只接受info级别日志 /** * 1.列队名称 * 2.交换中心的名称 * 3.routingKey和生产者发布消息的时候指定的一样 */ channel.queueBind(queueName, "logs", routingKey); //解决匿名内部类不能使用非final变量问题 final String tempRoutingKey = routingKey; //定义一个Consumer消费logs的消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body,"UTF-8"); logger.debug("我是打印日志的消费者:"+message+",routingKey为:"+tempRoutingKey); } }; //自动确认为true,接收到消息后该消息就销毁了 channel.basicConsume(queueName, true, consumer); } catch (Exception e) { logger.error("消费消息时异常:"+e); } } }2.测试启动的main方法:ExchangeTopicMain.java
package com.rabbit.main; import com.rabbit.exchange.LogReceiveTopic; public class ExchangeTopicMain { public static void main(String[] args) { LogReceiveTopic logReceive = new LogReceiveTopic(); //*匹配1个词词词词词词 logReceive.messageReceive("core.*"); } }//匹配结果: //core.*:可以 core.#:可以 //logSender.sendMessage(“hello tiglle:core.info”,”core.info”); //core.*:不行 core.#:可以 logSender.sendMessage(“hello tiglle:core.member.info”,”core.member.info”); //注:生产者不能使用通配符,生产者的通配符定义会被当成普通字符文本 //logSender.sendMessage(“hello tiglle:core.#”,”core.#”);//core.#将会被解析成普通文本