RabbitMQ 是高级消息队列协议(AMQP)的开源消息代理软件。RabbitMQ 服务器是用 Erlang 语言编写的,消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用, 或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异 步和解偶. 或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理, 或者工作队列。所有这些都可以通过消息实现。 RabbitMQ 是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发 送和接收平台,并且保证消息在传输过程中的安全。 RabbitMQ 是一个消息代理。它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ就扮演着邮箱、邮局以及邮递员的角色。
可靠性:RabbitMQ 提供了各种功能,让你权衡性能与可靠性,其中包括持久性,交付确认和高可用性。 灵活的路由:消息在到达队列之前,通过交换机的路由。RabbitMQ 为典型的路由逻辑提供了几个内置的交换机类型。对于更复杂的路由,则可以绑定几种交换机一起使用甚至可以自己实现交换机类型,并且把它作为一个插件的来使用。 集群:在本地网络上的几个 RabbitMQ 服务器可以聚集在一起,作为一个独立的逻辑代理来使用。 联合:对于服务器来说,它比集群需要更多的松散和非可靠链接。为此 RabbitMQ 提供了联合模型。 高度可用队列:在群集中,队列可以被镜像到几个机器中,确保您的消息即使在出现硬件故障的安全。 多协议:RabbitMQ 支持上各种消息传递协议的消息传送. 许多客户端:有你能想到的几乎任何语言 RabbitMQ 客户端。 管理用户界面:RabbitMQ 附带一个简单使用管理用户界面,允许您监视和控制您的消息代理的各个方面。 追踪:如果您的消息系统行为异常,RabbitMQ 提供跟踪支持,让你找出问题是什么。
RabbitMQ 使用的是 AMQP 协议。要使用 rabbitmq,你需要一个库来解读这个协议。这里python使用pika
sudo apt-get install python-pip git-core # 安装所需要的依赖 sudo pip install pika==0.9.5
第一个程序send.py
#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() #连接rabbitmq服务器channel.queue_declare(queue=’hello’) #创建一个名为hello的队列channel.basic_publish(exchange=”, routing_key=’hello’, body=’Hello World!’) #消息不能直接发送给队列,需要通过交换机发送给队列(exchange为交换机)print ” [x] Sent ‘Hello World!’”connection.close() #关闭与服务器的连接第二个程序recieve.py
#!/usr/bin/env python import pika
connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel()channel.queue_declare(queue=’hello’)print ’ [*] Waiting for messages. To exit press CTRL+C’ def callback(ch, method, properties, body): print ” [x] Received %r” % (body,)#创建回调函数,当收到消息,pika库调用回调函数把消息打印到屏幕 channel.basic_consume(callback, queue=’hello’, no_ack=True)#告诉 RabbitMQ 这个回调函数将会从名为 “hello” 的队列中接收消息 channel.start_consuming()#等待消息数据并且在需要的时候运行回调函数的无限循环工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
new_task.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() message = ’ ‘.join(sys.argv[1:]) or “Hello World!” channel.basic_publish(exchange=”, routing_key=’hello’, body=message) print ” [x] Sent %r” % (message,)
work.py ;消息体中每一个点号(.)模拟1秒钟的操作
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() channel.queue_declare(queue=’hello’) print ’ [*] Waiting for messages. To exit press CTRL+C’ def callback(ch, method, properties, body): print ” [x] Received %r” % (body,) time.sleep( body.count(‘.’) ) print ” [x] Done” channel.basic_consume(callback, queue=’hello’, no_ack=True) channel.start_consuming()
shell3$ python new_task.py First message. shell4$ python new_task.py Second message.. shell5$ python new_task.py Third message… shell6$ python new_task.py Fourth message…. shell7$ python new_task.py Fifth message…..
奇数的消息都发送到第一个终端work,偶数的发送到第二个终端work
为了防止消息丢失,RabbitMQ 提供了消息响应(acknowledgments)。消费者会通过一个 ack(响应),告诉 RabbitMQ 已经收到并处理了某条消息,然后 RabbitMQ 就会释放并删除这条消息。 如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ 就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。 消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ 会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。 消息响应默认是开启的。之前的例子中我们可以使用 no_ack=True 标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。
通过 basic_ack() 告诉 RabbitMQ 已经收到并处理了某条消息,然后 RabbitMQ 就会释放并删除这条消息。一个很容易犯的错误就是忘了使用 basic_ack() 响应服务端,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,成为死信,RabbitMQ 就会占用越来越多的内存。 为了排除这种错误,你可以使用 rabbitmqctl 命令,输出 messages_unacknowledged 字段: sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() channel.queue_declare(queue=’task_queue’, durable=True)#队列持久化 message = ’ ‘.join(sys.argv[1:]) or “Hello World!” channel.basic_publish(exchange=”, routing_key=’task_queue’, body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print ” [x] Sent %r” % (message,) connection.close()
使用 basic.qos 方法,并设置 prefetch_count=1。这样是告 RabbitMQ ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ 就会把消息分发给下一个空闲的工作者(worker):
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() channel.queue_declare(queue=’task_queue’, durable=True) print ’ [*] Waiting for messages. To exit press CTRL+C’ def callback(ch, method, properties, body): print ” [x] Received %r” % (body,) time.sleep( body.count(‘.’) ) print ” [x] Done” ch.basic_ack(delivery_tag = method.delivery_tag)#告诉 RabbitMQ 已经收到并处理了某条消息,然后 RabbitMQ 就会释放并删除这条消息 channel.basic_qos(prefetch_count=1)# 公平调度 channel.basic_consume(callback, queue=’task_queue’) channel.start_consuming()
让我们简单的概括一下之前的教程: 发布者(producer):发布消息的应用程序 队列(queue):用于消息存储的缓冲 消费者(consumer):接收消息的应用程序 RabbitMQ 消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。 发布者(producer)只需把消息发送给一个交换机(exchange )。交换机非常简单,它一边从发布者接收消息,一边把消息消息推送到队列。交换机必须知道如何处理它接收的消息,是应该推送到指定的队列还是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
有几个可供选择的交换机类型:直连交换机(direct),主题交换机(topic),头交换机(headers)和扇形交换机(fanout)。
我们在这里主要说明最后一个,扇形交换机。先创建一个 fanout 类型的交换机,命令为 logs : channel.exchange_declare(exchange=’logs’, type=’fanout’) 扇形交换机(fanout)很简单,你可能从名字上就能猜测出来,它把消息发送给所有的队列,这正是我们日志系统所需要的。
绑定
我们已经创建了一个扇形交换机( fanout )和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。 交换机和队列之间的联系我们称之为绑定( binding )。 channel.queue_bind(exchange=’logs’,queue=result.method.queue) 现在,logs 交换机将会把消息加到我们的队列中。
emit_log.py脚本
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() channel.exchange_declare(exchange=’logs’, type=’fanout’) #声明了个名为logs的扇形交换机 message = ’ ‘.join(sys.argv[1:]) or “info: Hello World!” channel.basic_publish(exchange=’logs’, routing_key=”, body=message) print ” [x] Sent %r” % (message,) connection.close()
recieve_logs.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() channel.exchange_declare(exchange=’logs’, type=’fanout’) result = channel.queue_declare(exclusive=True)#当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。随机生成队列 queue_name = result.method.queue#获得随机生成的队列 channel.queue_bind(exchange=’logs’, queue=queue_name)#绑定 print ’ [*] Waiting for logs. To exit press CTRL+C’ def callback(ch, method, properties, body): print ” [x] %r” % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
打开三个终端,进入代码的目录,分别输入以下三个命令:
sudo service rabbitmq-server start #确保服务已开启 python receive_logs.py > logs_from_rabbit.log #把日志保存到 log 文件里 python receive_logs.py #在屏幕上查看日志 python emit_log.py # 发送日志
我们使用的扇型交换机( fanout exchange )没有足够的灵活性 —— 它能做的仅仅是广播。 直连交换机( direct exchange )来代替。路由的算法很简单 —— 交换机将会对绑定键( binding key )和路由键(routing key )进行精确匹配,从而确定消息该分发到哪个队列。
在这个场景中,我们可以看到直连交换机 X 和两个队列进行了绑定。第一个队列使用 orange 作为绑定键,第二个队列有两个绑定,一个使用 blank 作为绑定键,另一个使用 green 。 这样以来,当路由键为 orange 的消息发布到交换机,就会被路由到 Q1 。路由键为 black 或者 green 的消息就会路由到 Q2.其他的所有消息都将会被丢弃。
emit_log_direct.py的代码:
#!/usr/bin/env python
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() channel.exchange_declare(exchange=’direct_logs’, type=’direct’) severity = sys.argv[1] if len(sys.argv) > 1 else ‘info’ message = ’ ‘.join(sys.argv[2:]) or ‘Hello World!’ channel.basic_publish(exchange=’direct_logs’, routing_key=severity, body=message) print ” [x] Sent %r:%r” % (severity, message) connection.close()
receive_logs_direct.py的代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() channel.exchange_declare(exchange=’direct_logs’, type=’direct’) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: print >> sys.stderr, “Usage: %s [pub] [msg] [info] [warning] [error]” % \ (sys.argv[0],) sys.exit(1) for severity in severities: channel.queue_bind(exchange=’direct_logs’, queue=queue_name, routing_key=severity) #为我们感兴趣的每个严重级别分别创建一个新的绑定 print ’ [*] Waiting for logs. To exit press CTRL+C’ def callback(ch, method, properties, body): print ” [x] %r:%r” % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是 几个推荐的例子:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。词语的个数可以随意, 但是不要超过255字节。 绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似,一个携带着特定路由键的消息会被主题交 换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式: * 用来表示一个单词 # 用来表示任意数量(零个或多个)单词 接下来用图来介绍一下:
主题交换机是很强大的,它可以表现出跟其他交换机类似的行为 当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
receive_logs_topic.py 的代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=’localhost’)) channel = connection.channel() channel.exchange_declare(exchange=’topic_logs’, type=’topic’) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: print >> sys.stderr, “Usage: %s [binding_key]…” % (sys.argv[0],) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=’topic_logs’, queue=queue_name, routing_key=binding_key) print ’ [*] Waiting for logs. To exit press CTRL+C’ def callback(ch, method, properties, body): print ” [x] %r:%r” % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()