RabbitMQ实现异步及同步RPC

xiaoxiao2021-02-28  76

一、同步RPC 客户端:

package com.rabbitmq.synchronization; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.utility.BlockingCell; import com.xinwei.rabbitmq.RabbitConfig; public class RPCClient { private final Map<String,BlockingCell<Object>> _contibuationMap = new HashMap<String, BlockingCell<Object>>(); private int correlationId = 0; private Connection connection; private Channel channel; private String requestQueuename = "rpc_queue"; private String replyQueueName; private DefaultConsumer _consumer; public RPCClient() throws IOException, TimeoutException{ ConnectionFactory factory =new ConnectionFactory(); factory.setHost(RabbitConfig.ip); factory.setPort(RabbitConfig.port); factory.setUsername(RabbitConfig.username); factory.setPassword(RabbitConfig.password); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); _consumer = setupConsumer(); } public String call(String message) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException, TimeoutException{ if(_consumer == null) return null; BlockingCell<Object> k = new BlockingCell<Object>(); BasicProperties props; synchronized (_contibuationMap) { correlationId++; String corrId = "" + correlationId; props = new BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); _contibuationMap.put(corrId, k); } channel.basicPublish("", requestQueuename, props, message.getBytes()); Object reply = k.uninterruptibleGet(4000); if (reply instanceof ShutdownSignalException) { ShutdownSignalException sig = (ShutdownSignalException) reply; ShutdownSignalException wrapper = new ShutdownSignalException( sig.isHardError(), sig.isInitiatedByApplication(), sig.getReason(), sig.getReference()); wrapper.initCause(sig); throw wrapper; } else { return (String)reply; } } public void close() throws IOException{ connection.close(); } protected DefaultConsumer setupConsumer() throws IOException { DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException signal) { synchronized (_contibuationMap) { for (Entry<String, BlockingCell<Object>> entry : _contibuationMap .entrySet()) { entry.getValue().set(signal); } _consumer = null; } } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { synchronized (_contibuationMap) { String replyId = properties.getCorrelationId(); BlockingCell<Object> blocker = _contibuationMap .get(replyId); _contibuationMap.remove(replyId); if (null != body) { String replyBoay = new String(body, "UTF-8"); blocker.set(replyBoay); } else { blocker.set(body); } } } }; channel.basicConsume(replyQueueName, true, consumer); return consumer; } public static void main(String[] args) throws Exception { RPCClient fibrpc = new RPCClient(); System.out.println("request fib(30)"); String response = fibrpc.call("30"); System.out.println("got "+response+","); fibrpc.close(); } }

服务器端

package com.rabbitmq.synchronization; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class RPCServer { public static final String RPC_QUENE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RPCConfig.ip); factory.setPort(RPCConfig.port); factory.setUsername(RPCConfig.username); factory.setPassword(RPCConfig.password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUENE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUENE_NAME, false,consumer); System.out.println("Awaiting RPCrequests"); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println("fib("+message+")"); String response = ""+fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static int fib(int n) throws Exception{ if(n==0) return 0; if(n==1) return 1; return fib(n-1) + fib(n-2); } }

二、异步RPC 客户端

package com.rabbitmq; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.utility.BlockingCell; public class RPCClient { private Connection connection; private Channel channel; private String requestQueuename = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws IOException, TimeoutException{ ConnectionFactory factory =new ConnectionFactory(); factory.setHost(RabbitConfig.ip); factory.setPort(RabbitConfig.port); factory.setUsername(RabbitConfig.username); factory.setPassword(RabbitConfig.password); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true,consumer); } public String call(String message) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException, TimeoutException{ String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueuename, props, message.getBytes()); BlockingCell<Object> k = new BlockingCell<Object>(); Object reply = k.uninterruptibleGet(); System.out.println("reply:"+reply.toString()); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if(delivery.getProperties().getCorrelationId().equals(corrId)){ response = new String(delivery.getBody()); break; } } return response; } public void close() throws IOException{ connection.close(); } public static void main(String[] args) throws Exception { RPCClient fibrpc = new RPCClient(); System.out.println("request fib(30)"); String response = fibrpc.call("30"); System.out.println("got "+response+","); fibrpc.close(); } }

服务器端

package com.rabbitmq; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class RPCServer { public static final String RPC_QUENE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RabbitConfig.ip); factory.setPort(RabbitConfig.port); factory.setUsername(RabbitConfig.username); factory.setPassword(RabbitConfig.password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUENE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUENE_NAME, false,consumer); System.out.println("Awaiting RPCrequests"); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println("fib("+message+")"); String response = ""+fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static int fib(int n) throws Exception{ if(n==0) return 0; if(n==1) return 1; return fib(n-1) + fib(n-2); } }
转载请注明原文地址: https://www.6miu.com/read-48139.html

最新回复(0)