spring integration-rmi

xiaoxiao2025-10-02  2

1.spring integrtion 说明

2.概念

2.1 Message

Header 包含通用的信息例如

idtimestampcorrelation idreturn address

Payload 则是POJO对象,例如User

public class User { private int id; private String name; //getter and setter }
2.2 Message Channel

Channel 分为

P2P channel里的消息只能由一个消费者Publish/Subscribe channel里的消息会广播给所有的订阅者

Pollable Channels 会缓存消息到一个队列里,避免发送过量的消息给消费者而导致消费者崩溃

2.3 Message Endpoint

消息的终端,就是消息的消费者,分为

Transformer :将channel里的消息进行转换 例如 xml 转成JsonFilter :过滤哪些消息是想要消费的Router :路由,将消息转到哪些管道里Splitter :消息分割Aggregator :消息汇总Service Activator : 比较重要,这个一般配置在我们已经写好的服务,想暴露出去Channel Adapter :不是很明白

这些类似流的处理,用官方的sample 点cafe来表示

流程是

调用Cafe这个网关类去点cafe(封装成为上面提到的概念里的消息-Message)cafe网关有个关联通道是orders,cafe消息会发送到orders这个通道里orderSplitter是上面说的Message Endpoint里的Splitter,将消息组装成List放到drinks通道里DrinkRouter会订阅drinks通道里的信息,按冷热进行路由到不同的通道里hotDrinkBarista或coldDrinkBaristaBarista这个是订阅通道hotDrinkBarista或coldDrinkBarista来处理产生drink,放到preparedDrinks通道里Waiter会对所有preparedDrinks通道里的drink集合最后发到deliveries网关

这个例子很好的解释了 message gateway ,channal, message,message endpoint的几个重要概念

3.重要的类

3.1 消息网关(Messaging Gateways)

网关最重要的功能就是屏蔽调用channel细节,比如 MessageChannel 的send(Message msg)方法。 而是改成接口的方式,然后注册成为一个Bean,直接调用即可

4.例子

原来写了个UserService,很简单,只有加user和查找user2个功能

4.1 RMI 调用的例子

现在要暴露出来给其它系统用。原来的代码如下

@Service public class UserService { private ConcurrentHashMap<Integer,User> map = new ConcurrentHashMap(); public void addUser(User user){ map.put(user.getId(),user); } public User findUserById(int id){ return map.get(id); } }

依照spring integration不侵入的原则,只需要配置消息的终端到这个类上,即加上 @ServiceActivator,这个annotation是上面提到的消息终端的一种,监听到addUser和findUserById这2个通道

@Service public class UserService { private ConcurrentHashMap<Integer,User> map = new ConcurrentHashMap(); @ServiceActivator(inputChannel = "addUser") public void addUser(User user){ map.put(user.getId(),user); } @ServiceActivator(inputChannel = "findUserById") public User findUserById(int id){ return map.get(id); } } public class User { private int id; private String name; //getter and setter }

启动springboot, RmiInboundGateway网关会绑定一个rmi-input。

@SpringBootApplication public class ServerApplication { public static void main(String[] args) { SpringApplicationBuilder builder =new SpringApplicationBuilder(ServerApplication.class); builder.run(args); } @Bean public RmiInboundGateway inbound(@Qualifier("rmi-input") MessageChannel channel) { RmiInboundGateway gateway = new RmiInboundGateway(); gateway.setRequestChannel(channel); //默认就是1099 gateway.setRegistryPort(1099); return gateway; } @Bean("rmi-input") public MessageChannel requestChannel() { PublishSubscribeChannel channel = new PublishSubscribeChannel(); return channel; } }

@Router这个message endpoint监听的rmi-input,然后路由到不同channel,这里是找到message里的头部里的method值

@Component public class UserServiceRouter { @Router(inputChannel = "rmi-input") public String route(Message msg){ System.out.println("msg payload : "+msg.getPayload()); System.out.println("msg method : "+msg.getHeaders().get("method")); return msg.getHeaders().get("method").toString(); } }

另个项目里Client需要去调用这个UserService

@SpringBootApplication public class ClientApplication { public static void main(String[] args) { SpringApplicationBuilder builder =new SpringApplicationBuilder(ClientApplication.class); ConfigurableApplicationContext context = builder.run(args); RmiOutboundGateway gw = (RmiOutboundGateway)context.getBean("outbound"); Message<User> msg =MessageBuilder.withPayload(new User(1,"rechard")) .setHeader("method","addUser") .build(); gw.handleRequestMessage(msg); Message<Integer> msg2 =MessageBuilder.withPayload(1) .setHeader("method","findUserById") .build(); Object obj = gw.handleRequestMessage(msg2); System.out.println("return obj:"+obj); } @Bean @ServiceActivator(inputChannel="inChannel") public RmiOutboundGateway outbound() { //DESKTOP-TTESTAQ 是机器名,也可以是IP地址 RmiOutboundGateway gateway = new RmiOutboundGateway("rmi://DESKTOP-TTESTAQ/org.springframework.integration.rmiGateway.rmi-input"); return gateway; } }

整个流程(粗体代表channel)

RmiOutboundGateway->rmi://DESKTOP-TTESTAQ/org.springframework.integration.rmiGateway.rmi-input->RmiInboundGateway->rmi-input->UserServiceRouter->addUser or findUserById->UserService

转载请注明原文地址: https://www.6miu.com/read-5037251.html

最新回复(0)