akka版本:2.3.6.(不同版本由于api不同,实现可能不同)
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.6</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.6</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
<version>2.3.6</version>
</dependency>
服务端:
package com.akka; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; public class HelloWorld extends UntypedActor{ LoggingAdapter log = Logging.getLogger(getContext().system(), this); public static void main( String[] args ) throws Exception{ ActorSystem system = ActorSystem.create("test",createConfig()); ActorRef actorRef = system.actorOf(Props.create(HelloWorld.class),"helloWorld"); System.out.println(actorRef.path()); actorRef.tell("启动", ActorRef.noSender()); // System.out.println(system.settings()); // actorRef.tell("test1", null); // ActorSelection actorSel = system.actorSelection("akka.tcp://test@127.0.0.1:2552/user/helloWorld"); // actorSel.tell("test client", ActorRef.noSender()); } @Override public void onReceive(Object message) throws Exception { if (message instanceof String) { log.info("Received String message: {}", message); getSender().tell("Received String message:"+message, getSelf()); } else{ log.info("Exception String message: {}", message); } } public static Config createConfig() { Map<String, Object> map = new HashMap<String, Object>(); // map.put("akka.loglevel", "ERROR"); // map.put("akka.stdout-loglevel", "ERROR"); //开启akka远程调用 map.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); List<String> remoteTransports = new ArrayList<String>(); remoteTransports.add("akka.remote.netty.tcp"); map.put("akka.remote.enabled-transports", remoteTransports); map.put("akka.remote.netty.tcp.hostname", "127.0.0.1"); map.put("akka.remote.netty.tcp.port", 2552); map.put("akka.remote.netty.tcp.maximum-frame-size", 100 * 1024 * 1024); //forkjoinpool默认线程数 max(min(cpu线程数 * parallelism-factor, parallelism-max), 8) map.put("akka.actor.default-dispatcher.fork-join-executor.parallelism-factor", "50"); map.put("akka.actor.default-dispatcher.fork-join-executor.parallelism-max", "50"); return ConfigFactory.parseMap(map); } } 客户端: package com.akka; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; public class Client { public static void main( String[] args ) throws Exception{ ActorSystem system = ActorSystem.create("test",createConfig()); ActorSelection actorSel = system.actorSelection("akka.tcp://test@127.0.0.1:2552/user/helloWorld"); actorSel.tell("test client 1", ActorRef.noSender()); Thread.sleep(2000); actorSel.tell("test client 2", ActorRef.noSender()); Thread.sleep(2000); actorSel.tell("test client 3", ActorRef.noSender()); Thread.sleep(2000); actorSel.tell("test client 4", ActorRef.noSender()); Thread.sleep(2000); actorSel.tell("test client 5", ActorRef.noSender()); // system.shutdown(); } public static Config createConfig() { Map<String, Object> map = new HashMap<String, Object>(); // map.put("akka.loglevel", "ERROR"); // map.put("akka.stdout-loglevel", "ERROR"); //开启akka远程调用 map.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); List<String> remoteTransports = new ArrayList<String>(); remoteTransports.add("akka.remote.netty.tcp"); map.put("akka.remote.enabled-transports", remoteTransports); map.put("akka.remote.netty.tcp.hostname", "127.0.0.1"); map.put("akka.remote.netty.tcp.port", 2553); map.put("akka.remote.netty.tcp.maximum-frame-size", 100 * 1024 * 1024); //forkjoinpool默认线程数 max(min(cpu线程数 * parallelism-factor, parallelism-max), 8) map.put("akka.actor.default-dispatcher.fork-join-executor.parallelism-factor", "50"); map.put("akka.actor.default-dispatcher.fork-join-executor.parallelism-max", "50"); return ConfigFactory.parseMap(map); } }本来想,在本地起服务,本地调用,但是总是连接不到服务端,无法发送消息。后来找到问题,似乎服务器和客户端都得加配置:
map.put("akka.actor.provider","akka.remote.RemoteActorRefProvider");
默认是local。加了这个配置后可以正常通信。而且hostname客户端调用的时候,必须与服务端设置的一样,否则还是连接失败。
2:become/unbecome
用context.become方法可以使用新的消息循环处理替换当前的消息处理器,被替换的消息处理器被压到一个栈结构,支持消息处理器的出栈和入栈。使用unbecome出栈。
替换消息处理后,onreceive逻辑使用become后的消息处理。