Header 包含通用的信息例如
idtimestampcorrelation idreturn addressPayload 则是POJO对象,例如User
public class User { private int id; private String name; //getter and setter }Channel 分为
P2P channel里的消息只能由一个消费者Publish/Subscribe channel里的消息会广播给所有的订阅者Pollable Channels 会缓存消息到一个队列里,避免发送过量的消息给消费者而导致消费者崩溃
消息的终端,就是消息的消费者,分为
Transformer :将channel里的消息进行转换 例如 xml 转成JsonFilter :过滤哪些消息是想要消费的Router :路由,将消息转到哪些管道里Splitter :消息分割Aggregator :消息汇总Service Activator : 比较重要,这个一般配置在我们已经写好的服务,想暴露出去Channel Adapter :不是很明白这些类似流的处理,用官方的sample 点cafe来表示
流程是
调用Cafe这个网关类去点cafe(封装成为上面提到的概念里的消息-Message)cafe网关有个关联通道是orders,cafe消息会发送到orders这个通道里orderSplitter是上面说的Message Endpoint里的Splitter,将消息组装成List放到drinks通道里DrinkRouter会订阅drinks通道里的信息,按冷热进行路由到不同的通道里hotDrinkBarista或coldDrinkBaristaBarista这个是订阅通道hotDrinkBarista或coldDrinkBarista来处理产生drink,放到preparedDrinks通道里Waiter会对所有preparedDrinks通道里的drink集合最后发到deliveries网关这个例子很好的解释了 message gateway ,channal, message,message endpoint的几个重要概念
网关最重要的功能就是屏蔽调用channel细节,比如 MessageChannel 的send(Message msg)方法。 而是改成接口的方式,然后注册成为一个Bean,直接调用即可
原来写了个UserService,很简单,只有加user和查找user2个功能
现在要暴露出来给其它系统用。原来的代码如下
@Service public class UserService { private ConcurrentHashMap<Integer,User> map = new ConcurrentHashMap(); public void addUser(User user){ map.put(user.getId(),user); } public User findUserById(int id){ return map.get(id); } }依照spring integration不侵入的原则,只需要配置消息的终端到这个类上,即加上 @ServiceActivator,这个annotation是上面提到的消息终端的一种,监听到addUser和findUserById这2个通道
@Service public class UserService { private ConcurrentHashMap<Integer,User> map = new ConcurrentHashMap(); @ServiceActivator(inputChannel = "addUser") public void addUser(User user){ map.put(user.getId(),user); } @ServiceActivator(inputChannel = "findUserById") public User findUserById(int id){ return map.get(id); } } public class User { private int id; private String name; //getter and setter }启动springboot, RmiInboundGateway网关会绑定一个rmi-input。
@SpringBootApplication public class ServerApplication { public static void main(String[] args) { SpringApplicationBuilder builder =new SpringApplicationBuilder(ServerApplication.class); builder.run(args); } @Bean public RmiInboundGateway inbound(@Qualifier("rmi-input") MessageChannel channel) { RmiInboundGateway gateway = new RmiInboundGateway(); gateway.setRequestChannel(channel); //默认就是1099 gateway.setRegistryPort(1099); return gateway; } @Bean("rmi-input") public MessageChannel requestChannel() { PublishSubscribeChannel channel = new PublishSubscribeChannel(); return channel; } }@Router这个message endpoint监听的rmi-input,然后路由到不同channel,这里是找到message里的头部里的method值
@Component public class UserServiceRouter { @Router(inputChannel = "rmi-input") public String route(Message msg){ System.out.println("msg payload : "+msg.getPayload()); System.out.println("msg method : "+msg.getHeaders().get("method")); return msg.getHeaders().get("method").toString(); } }另个项目里Client需要去调用这个UserService
@SpringBootApplication public class ClientApplication { public static void main(String[] args) { SpringApplicationBuilder builder =new SpringApplicationBuilder(ClientApplication.class); ConfigurableApplicationContext context = builder.run(args); RmiOutboundGateway gw = (RmiOutboundGateway)context.getBean("outbound"); Message<User> msg =MessageBuilder.withPayload(new User(1,"rechard")) .setHeader("method","addUser") .build(); gw.handleRequestMessage(msg); Message<Integer> msg2 =MessageBuilder.withPayload(1) .setHeader("method","findUserById") .build(); Object obj = gw.handleRequestMessage(msg2); System.out.println("return obj:"+obj); } @Bean @ServiceActivator(inputChannel="inChannel") public RmiOutboundGateway outbound() { //DESKTOP-TTESTAQ 是机器名,也可以是IP地址 RmiOutboundGateway gateway = new RmiOutboundGateway("rmi://DESKTOP-TTESTAQ/org.springframework.integration.rmiGateway.rmi-input"); return gateway; } }整个流程(粗体代表channel)
RmiOutboundGateway->rmi://DESKTOP-TTESTAQ/org.springframework.integration.rmiGateway.rmi-input->RmiInboundGateway->rmi-input->UserServiceRouter->addUser or findUserById->UserService