AKKA 邮箱Mailbox

xiaoxiao2021-02-28  1.1K+

邮箱Mailbox

Akka Mailbox持有发给actor的消息。通常,每一个Actor都与自己的邮箱,但是当使用BalancingPool时,所有的路由都共享一个邮箱实例。

邮箱选择

Actor需要的消息队列类型

特定类型的actor可以用特定类型的消息队列,只要这个actor实现了参数化的接口RequiresMessageQueue。这里是一个例子:

import akka.dispatch.BoundedMessageQueueSemantics;import akka.dispatch.RequiresMessageQueue; public class MyBoundedUntypedActor extends MyUntypedActor  implements RequiresMessageQueue<BoundedMessageQueueSemantics> {}

RequiresMessageQueue接口的类型参数需要在配置中映射到一个邮箱,就像这样:

bounded-mailbox {  mailbox-type = "akka.dispatch.BoundedMailbox"  mailbox-capacity = 1000  mailbox-push-timeout-time = 10s} akka.actor.mailbox.requirements {  "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox}

现在每次你创建一个类型为MyBoundedUntypedActoractor,它都将会尝试获取一个有界邮箱。如果actor在部署时配置了不同的邮箱,可能是直接配置的,也可能是通过带有特定邮箱类型的分发器,那么就会覆写这个映射。

注意

Actor创建的邮箱中的队列类型用接口中要求的类型进行检查,如果队列没有实现要求的类型,那么actor创建就会失败。

Dispatcher需要的消息队列类型

分发器也需要一个邮箱类型,用于运行中的actor。一个例子就是BalancingDispatcher,它需要一个并发的、线程安全的消息队列。这样的需求可以在分发器配置中进行规划,就像这样:

my-dispatcher {  mailbox-requirement = org.example.MyInterface}

给定的需求命名了一个类或者接口,必须保证这个类或者接口是消息队列实现的超类型。万一冲突了,例如如果actor需要一个邮箱类型,但是它不满足这个需求,那么actor创建就会失败。

如何选择邮箱类型

当创建actor时,ActorRefProvider首先确定分发器,分发起会执行actor。然后按照如下顺序确定邮箱类型:

如果actor的部署配置部分包含一个mailbox关键字,那么这个mailbox关键字就指定了要使用的邮箱类型;如果actorProps包含mailbox选择即调用了withMailbox方法那么这个方法指定要使用的邮箱类型;如果分发器的配置部分包含一个mailbox-type关键字,那么这部分也将被用于配置邮箱类型;如果actor需要上面描述的邮箱类型,那么这个需求的映射将被用于确定邮箱类型;如果失败了,那么分发器的需求-如果存在-将被会尝试;如果分发器需要后面描述的邮箱类型,那么这个需求的映射将被用于确定邮箱类型;将使用默认的邮箱akka.actor.default-mailbox

默认邮箱

当按照上述描述的依然没有指定邮箱。那么就会使用默认的邮箱。默认邮箱his一个无界邮箱,是由java.util.concurrent.ConcurrentLinkedQueue支持的。

SingleConsumerOnlyUnboundedMailbox是更高效的邮箱,它可被用于默认邮箱,但是它不能被用于BalancingDispatcher

SingleConsumerOnlyUnboundedMailbox配置为默认邮箱:

akka.actor.default-mailbox {  mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"}

那些配置会传给Mailbox类型

每一个邮箱类型都继承自MailboxType,它的构造函数有两个参数:ActorSystem.Settings对象和Config对象。后面这个是通过actor系统的配置获取,用邮箱类型的配置路径覆盖它的id关键字,并添加一个默认邮箱配置的回调。

内置的Mailbox实现

Akka自带了很多邮箱实现:

UnboundedMailbox (默认) 默认邮箱java.util.concurrent.ConcurrentLinkedQueue支持阻塞: No有界: No配置名: "unbounded"  "akka.dispatch.UnboundedMailbox" SingleConsumerOnlyUnboundedMailbox

这个队列可能会或可能不会比默认邮箱更快,取决于你的使用场景—请确保进行过适当的基准测试!

由多生产者-单消费者队列支持,不能用于BalancingDispatcher阻塞: No有界: No配置名:"akka.dispatch.SingleConsumerOnlyUnboundedMailbox" NonBlockingBoundedMailbox 由非常高效的多生产者-单消费者队列支持阻塞: No (将溢出消息丢弃到死信)有界: Yes配置名:"akka.dispatch.NonBlockingBoundedMailbox" UnboundedControlAwareMailbox 优先派送akka.dispatch.ControlMessage消息由两个java.util.concurrent.ConcurrentLinkedQueue支持阻塞: No有界: No配置名:"akka.dispatch.UnboundedControlAwareMailbox" UnboundedPriorityMailbox java.util.concurrent.PriorityBlockingQueue支持相同优先级的消息的派送顺序未定义-UnboundedStablePriorityMailbox相反阻塞: No有界: No配置名:"akka.dispatch.UnboundedPriorityMailbox" UnboundedStablePriorityMailbox 由包装到akka.util.PriorityQueueStabilizerjava.util.concurrent.PriorityBlockingQueue支持相同优先级的消息保证按照FIFO顺序派送- contrast with the UnboundedPriorityMailbox阻塞: No有界: No配置名:"akka.dispatch.UnboundedStablePriorityMailbox"

其它的有界邮箱实现如果达到最大容量,并且配置了non-zero mailbox-push-timeout-time,会阻塞发送者。

注意

下面的邮箱只应该用于mailbox-push-timeout-time0的情况。

BoundedMailbox java.util.concurrent.LinkedBlockingQueue支持阻塞如果使用non-zero mailbox-push-timeout-timeYes,否则为No有界: Yes配置名:"bounded"  "akka.dispatch.BoundedMailbox" BoundedPriorityMailbox 由包装到akka.util.BoundedBlockingQueuejava.util.PriorityQueue支持相同优先级的消息的派送顺序未定义-BoundedStablePriorityMailbox相反阻塞如果使用了non-zero mailbox-push-timeout-time则为Yes,否则为No有界: Yes配置名:"akka.dispatch.BoundedPriorityMailbox" BoundedStablePriorityMailbox 由包装在akka.util.PriorityQueueStabilizerakka.util.BoundedBlockingQueue中的java.util.PriorityQueue支持相同优先级的消息的派送顺序为FIFO-BoundedPriorityMailbox相反阻塞: Yes 如果使用了non-zero mailbox-push-timeout-time则为Yes,否则为No有界: Yes配置名:"akka.dispatch.BoundedStablePriorityMailbox" BoundedControlAwareMailbox 优先派送akka.dispatch.ControlMessage消息由两个java.util.concurrent.ConcurrentLinkedQueue支持,如果达到最大容量,则阻塞排队阻塞: Yes 如果使用了non-zero mailbox-push-timeout-time则为Yes,否则为No有界: Yes配置名:"akka.dispatch.BoundedControlAwareMailbox"

邮箱配置示例

如果创建PriorityMailbox

import com.typesafe.config.Config;

import akka.actor.ActorSystem;

import akka.actor.PoisonPill;

import akka.dispatch.PriorityGenerator;

import akka.dispatch.UnboundedPriorityMailbox;

 

public class MyPrioMailbox extends UnboundedPriorityMailbox {

    // 用于反射实例化

    public MyPrioMailbox(ActorSystem.Settings settings, Config config) {

        //创建一个新的PriorityGenerator,低优先级意味着更重要

        super(new PriorityGenerator() {

            @Override

            public int gen(Object message) {

                if (message.equals("highpriority"))

                    return 0; // 如果可能的话,高优先级的消息应该被优先处理

                else if (message.equals("lowpriority"))

                    return 2; // 如果低优先级的消息应该被最后处理

                else if (message.equals(PoisonPill.getInstance()))

                    return 3; // 当没有剩余时,则为处理PoisonPill

                else

                    return 1; // 默认位于高优先级和低优先级

            }

        });

    }

}

然后把它添加到配置中:

prio-dispatcher {

  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"

  //Other dispatcher configuration goes here

}

下面是使用这个邮箱的例子:

import com.typesafe.config.Config;

import com.typesafe.config.ConfigFactory;

 

import akka.actor.ActorRef;

import akka.actor.ActorSystem;

import akka.actor.PoisonPill;

import akka.actor.Props;

import akka.actor.UntypedActor;

import akka.event.Logging;

import akka.event.LoggingAdapter;

 

public class Demo extends UntypedActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    {

        for (Object msg : new Object[] { "lowpriority""lowpriority""highpriority""pigdog""pigdog2""pigdog3",

                "highpriority", PoisonPill.getInstance() }) {

            getSelf().tell(msg, getSelf());

        }

    }

 

    public void onReceive(Object message) {

        log.info(message.toString());

    }

 

    public static void main(String[] args) {

        Config config = ConfigFactory.parseString("akka.loglevel = DEBUG \n" + "akka.actor.debug.lifecycle = on");

        // We create a new Actor that just prints out what it processes

        ActorSystem system = ActorSystem.create("mailbox");

        ActorRef myActor = system.actorOf(Props.create(Demo.class).withDispatcher("prio-dispatcher"), "demo");

        system.terminate();

    }

}

运行输出:

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] highpriority

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] highpriority

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog2

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog3

[INFO] [12/24/2016 23:38:22.365] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] lowpriority

[INFO] [12/24/2016 23:38:22.365] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] lowpriority

也可以直接配置邮箱类型,就像这样:

prio-mailbox {

  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"

  //Other mailbox configuration goes here

}

 

akka.actor.deployment {

  /priomailboxactor {

    mailbox = prio-mailbox

  }

}

然后就可以使用来自部署的邮箱类型:

ActorRef myActor =

  system.actorOf(Props.create(MyUntypedActor.class),

    "priomailboxactor");

或这样:

ActorRef myActor =

  system.actorOf(Props.create(MyUntypedActor.class)

    .withMailbox("prio-mailbox"));

创建自己的邮箱类型

一个值得上千次吹嘘的例子:

import akka.actor.ActorRef;

import akka.actor.ActorSystem;

import akka.dispatch.Envelope;

import akka.dispatch.MailboxType;

import akka.dispatch.MessageQueue;

import akka.dispatch.ProducesMessageQueue;

import com.typesafe.config.Config;

import java.util.concurrent.ConcurrentLinkedQueue;

import java.util.Queue;

import scala.Option;

 

public class MyUnboundedJMailbox implements MailboxType, ProducesMessageQueue<MyUnboundedJMailbox.MyMessageQueue> {

 

    // This is the MessageQueue implementation

    public static class MyMessageQueue implements MessageQueue, MyUnboundedJMessageQueueSemantics {

        private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();

 

        // these must be implemented; queue used as example

        public void enqueue(ActorRef receiver, Envelope handle) {

            queue.offer(handle);

        }

 

        public Envelope dequeue() {

            return queue.poll();

        }

 

        public int numberOfMessages() {

            return queue.size();

        }

 

        public boolean hasMessages() {

            return !queue.isEmpty();

        }

 

        public void cleanUp(ActorRef owner, MessageQueue deadLetters) {

            for (Envelope handle : queue) {

                deadLetters.enqueue(ownerhandle);

            }

        }

    }

 

    // This constructor signature must exist, it will be called by Akka

    public MyUnboundedJMailbox(ActorSystem.Settings settings, Config config) {

        // put your initialization code here

    }

 

    // The create method is called to create the MessageQueue

    public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {

        return new MyMessageQueue();

    }

}

//Marker interface used for mailbox requirements mapping

public interface MyUnboundedJMessageQueueSemantics {

}

然后只需要将分发器配置或mailbox配置"mailbox-type"的值指定为你的MailboxType的全限定名。

注意

确保包含一个参数为akka.actor.ActorSystem.Settingscom.typesafe.config.ConfigMake的构造函数,因为这个构造函数会被反射调用,以便构建你的邮箱类型。传入的第二参数config是来自于配置中描述使用了这个邮箱类型的分发器和邮箱设置的那部分。对于每一个分发器和邮箱,邮箱类型只实例化一次。

你也可以使用邮箱类型作为分发器的必要条件,就像这样:

custom-dispatcher {

  mailbox-requirement =

  "docs.dispatcher.MyUnboundedJMessageQueueSemantics"

}

 

akka.actor.mailbox.requirements {

  "docs.dispatcher.MyUnboundedJMessageQueueSemantics" =

  custom-dispatcher-mailbox

}

 

custom-dispatcher-mailbox {

  mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"

}

或者在你的actor上定义必备条件,就像这样:

import akka.actor.UntypedActor;

import akka.dispatch.RequiresMessageQueue;

 

public class MySpecialActor extends UntypedActor implements RequiresMessageQueue<MyUnboundedJMessageQueueSemantics> {

    @Override

    public void onReceive(Object arg0throws Exception {

        // TODO Auto-generated method stub

    }

}

system.actorOf的特殊语义

为了让system.actorOf既同步又是非阻塞的,同时保持返回类型ActorRef(和返回的引用功能齐全的语义),这种情况下需要特殊处理。在这些场景背后,构造了虚拟的actor引用,这些引用被发送到系统的守护actor,守护actor实际创建actor和它的上下文,并将它们放入引用内部。Until that has happened, 发送到ActorRef的消息将被本地排队,一旦交换实际的填写,它们将被转移到真正的邮箱。因此,

final Props props = ...

// 这个actor使用MyCustomMailbox,假设它是单例

system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender);

assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));

可能会失败;你将不得不允许一些时间来匆忙地传入检查并重试。TestKit.awaitCond

 

转载请注明原文地址: https://www.6miu.com/read-1729472.html

最新回复(0)