RabbitMQ任务分发

xiaoxiao2021-02-28  90

RabbitMQ中文文档 http://rabbitmq.mr-ping.com/

在RabbitMQ的文档中,对于任务分发是放在“工作队列”这一章节介绍的。

在后面介绍扇形、直连、主题交换机时并没有提到,在学习的过程中容易误认为只有默认交换机才具有任务分发机制。

RabbitMQ中消息的分发有3个过程 1. 生产者将消息发送到交换机 2. 交换机将消息分发到队列 3. 队列将消息分发给消费者

在步骤1,生产者使用basic.push方法发送消息,并指定交换机的名称和路由键。RabbitMQ将消息传递给对应的交换机,路由器则会通过路由键进一步将消息传递给队列。

不同的路由器是根据步骤2的行为进行区分的。扇形交换机会忽略路由键,将消息发送给所有绑定的队列(所有与该交换机绑定的队列,会收到同样的消息,类似redis中的发布、订阅机制)。对于直连交换机和主题交换机,在队列绑定交换机时都需要指定路由键,当与步骤1中的路由键存在对应关系时,交换机就会将消息发送给队列。

对于步骤2中的绑定操作,一个交换机可以绑定多个不同的队列,一个队列也可以绑定多个不同的交换机。如果有多个不同的队列,使用相同的路由键绑定路由器,则它们都会受到路由键对应的消息。 如果所示,队列S9b和Agl都使用路由键error绑定了直连交换机X,因此生产者发送路由键为error的消息时,队列S9b和Agl都会收到,消费者C1和C2都会进行处理。

步骤3才是进行任务分发的部分,也就是说任务的分发是指的多个消费者处理同一个队列中的信息时,会进行任务的分发。 例如一共N个队列, 第一个消费者,获取的是第0,N,2N … 个消息 第二个消费者,获取的是第1,N+1,2N+1 … 个消息 … 第N个消费者,获取的是第N-1,2N-1,3N-1 … 个消息

rabbitmq手册中关于直连交换机的文档地址: http://rabbitmq.mr-ping.com/tutorials_with_python/[4]Routing.html

该章节的示例程序中,通过rabbitmq声明了临时队列,因此运行多个consume脚本时,会生成多个不同的队列,并且一个队列只有一个消费者,所以并不会对队列中的任务进行分发。

下面对示例程序中的代码进行扩展,让每个队列对应两个消费者,进行任务的分发。

log_produce.php

生产者发送10条路由键为error的消息,10条路由键为info的消息,到直连路由器direct_logs。

<?php require_once '../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); for($i=0;$i<10;$i++){ //发送4条error消息 $severity = "error"; $data = $severity.$i; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo " [x] Sent ",$severity,':',$data," \n"; } for($i=0;$i<10;$i++){ //发送4条info消息 $severity = "info"; $data = $severity.$i; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo " [x] Sent ",$severity,':',$data," \n"; } $channel->close(); $connection->close();

info_consum.php

消费者处理info_queue中的消息,info_queue队列绑定到直连路由器direct_logs,并指定了error和info这两个路由键,所以info_queue队列会收到生产者发出的20条消息(10条info+10条error)。

<?php require_once '../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $queue_name = 'info_queue'; $channel->queue_declare($queue_name, false, true, false, false); $severity = "info"; $channel->queue_bind($queue_name, 'direct_logs', $severity); $severity = "error"; $channel->queue_bind($queue_name, 'direct_logs', $severity); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] '.$msg->delivery_info['routing_key'].':'.$msg->body."\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();

error_consume.php

消费者处理error_queue队列,error_queue队列绑定了直连交换机direct_logs,并指定了路由键error,因此direct_logs队列只接受10条error消息。

<?php require_once '../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $queue_name = 'error_queue'; $severity = "error"; $channel->queue_declare($queue_name, false, true, false, false); $channel->queue_bind($queue_name, 'direct_logs', $severity); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] '.$msg->delivery_info['routing_key'].':'.$msg->body."\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();

在测试时,一个生产者,四个消费者,其中两个消费者用于处理error_queue队列,两个消费者用于处理info_queue队列。

通过测试结果可以看出,info_queue队列绑定直连路由器direct_logs时指定了error和info两个路由键,因此接收到了生产者发出的全部20条消息,由于info_queue队列有两个消费者,因此进行了任务的分发;对于error_queue队列的结论也是类似的。

本文主要是对rabbitmq文档的进一步解释说明,并以直连路由器为例,编写了相关代码进行验证。对于扇形交换机和主题交换机的结论都是类似的。

结论:任务分发是在队列和消费者之间发生的,与交换机类型无关。

转载请注明原文地址: https://www.6miu.com/read-50317.html

最新回复(0)