第2篇 RabbitMQ简介实现HelloWord

xiaoxiao2021-02-28  87

RabbitMQ简介

1.1、rabbitMQ的优点(适用范围) 1. 基于erlang语言开发具有高可用高并发的优点,适合集群服务器。 2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全。 3. 有消息确认机制和持久化机制,可靠性高。 4. 开源 其他MQ的优势: 1. Apache ActiveMQ曝光率最高,但是可能会丢消息。 2. ZeroMQ延迟很低、支持灵活拓扑,但是不支持消息持久化和崩溃恢复。

1.2、几个概念说明 producer&Consumer producer指的是消息生产者,consumer消息的消费者。 Queue 消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失 设置为临时队列,queue中的数据在系统重启之后就会丢失 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除Exchange

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别: Direct 直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue fanout 广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。 topic 主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组) headers 消息体的header匹配(ignore) Binding 所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。 virtual host 在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

1.3、消息队列的使用过程 1. 客户端连接到消息队列服务器,打开一个channel。 2. 客户端声明一个exchange,并设置相关属性。 3. 客户端声明一个queue,并设置相关属性。 4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。 5. 客户端投递消息到exchange。 6. exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里

操作步骤: 创建连接工厂ConnectionFactory获取连接Connection通过连接获取通信通道Channel声明交换机Exchange:交换机类型分为四类:

FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念

        HeadersExchange :通过添加属性key-value匹配

        DirectExchange:按照routingkey分发到指定队列

        TopicExchange:多关键字匹配

声明队列Queue

将队列和交换机绑定

创建消费者

执行消息的消费

下面来演示一个使用java的简单实例: 1、首先是消息生产者和提供者的基类 package com.lin; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * * 功能概要: EndPoint类型的队列 * * @author linbingwen * @since 2016年1月11日 */ public abstract class EndPoint{ protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws IOException{ this.endPointName = endpointName; //Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //hostname of your rabbitmq server factory.setHost("10.75.4.25"); factory.setPort(5672); factory.setUsername("asdf"); factory.setPassword("123456"); //getting a connection connection = factory.newConnection(); //creating a channel channel = connection.createChannel(); //declaring a queue for this channel. If queue does not exist, //it will be created on the server. channel.queueDeclare(endpointName, false, false, false, null); } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的。 * @throws IOException */ public void close() throws IOException{ this.channel.close(); this.connection.close(); } } 2、消息提供者 package com.lin.producer; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; import com.lin.EndPoint; /** * * 功能概要:消息生产者 * * @author linbingwen * @since 2016年1月11日 */ public class Producer extends EndPoint{ public Producer(String endPointName) throws IOException{ super(endPointName); } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } } 3、消息消费者 package com.lin.consumer; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.lin.EndPoint; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; /** * * 功能概要:读取队列的程序端,实现了Runnable接口 * * @author linbingwen * @since 2016年1月11日 */ public class QueueConsumer extends EndPoint implements Runnable, Consumer{ public QueueConsumer(String endPointName) throws IOException{ super(endPointName); } public void run() { try { //start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true,this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer "+consumerTag +" registered"); } /** * Called when new message is available. */ public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("message number") + " received."); } public void handleCancel(String consumerTag) {} public void handleCancelOk(String consumerTag) {} public void handleRecoverOk(String consumerTag) {} public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {} } 4、测试 package com.lin.test; import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; import com.lin.consumer.QueueConsumer; import com.lin.producer.Producer; public class Test { public Test() throws Exception{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer("queue"); for (int i = 0; i < 1000000; i++) { HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } } /** * @param args * @throws SQLException * @throws IOException */ public static void main(String[] args) throws Exception{ new Test(); } } 其中引入的jar包: <!-- rabbitmq客户端 --> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.0.4</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency> </dependencies>

测试结果: 在提供消息 在消费消息  然后同时打开rabbitmq的服务端,输入如下: rabbitmqctl list_queues 这个命令是用来查看服务端中有多处个消息队列的。 可以看到有个名为queue的消息队列(更好的方法是安装好web监控插件,笔者一直安装失败,所以这里就不展示了)

转载请注明原文地址: https://www.6miu.com/read-33767.html

最新回复(0)