Akka, Concurrency, etc.

Dispatcher behavior

February 12, 2018

Overview

The concept of Akka Dispatcher might be unfamiliar to you and it is probably difficult to understand. So I am going to explain how Dispatcher works in detail here.

Meaning of “dispatch”

If you look up the meaning of the word “dispatch” in a dictionary, you would find it is almost same as “send”. In akka, Dispatcher is, yes, what sends messages, but something more than that.

Dispatcher and Actor relationship

dispatcher-config

Firstly, Dispatcher is configured for ActorSystem, typically in application.conf. There is at least default one, and you can also configure multiple Dispatchers.

val system = ActorSystem("exampleSystem")
system.dispatchers.lookup("my-dispatcher")

As a rule of thumb, the Dispatcher instance for the given name is created when the lookup method of ActorSystem is called for the first time. You don’t normally call it yourself, but this lookup is done by akka. Another thing is the default Dispatcher is already created upon ActorSystem initialization, as it calls lookup for the default internally.

/**
* Returns a dispatcher as specified in configuration. Please note that this
* method _may_ create and return a NEW dispatcher, _every_ call.
*
* Throws ConfigurationException if the specified dispatcher cannot be found in the configuration.
*/
def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher()

dispatcher-actor

Dispatcher is NOT part of Actor. One Dispatcher can send messages to multiple Actors. (NOTE: Dispatcher doesn’t have routing capabilities. Routing is done by akka Router)

Dispatcher and ExecutorService

dispatcher-executor-service

Dispatcher has ExecutorService, and ExecutorService is like a pool of threads where you can execute code (Runnable) concurrently. See Executor/ExecutorService in Java, and ExecutionContext behind Future in Scala for illustration and more details.

Here is executorService method of Dispatcher.

def executorService: ExecutorServiceDelegate = ...

The pool of threads from ExecutorService is what invokes Actor’s receive method, which will be explained later in this article.

Dispatcher and sender-side behavior

actor-cell-reference

Part of below is reharsing what was already discussed in Local Actor workflow part 1 - Sender side, but here more from the Dispatcher perspective.

(For remoting, there are several more steps to go through but it is combination of local message-passing and network via Netty, as discussed in remoting articles)

LocalActorRef is coupled with ActorCell, which is hidden from users as private and it is implementation details of how akka messaging works.

class LocalActorRef(...)
 extends ActorRefWithCell
 with LocalRef  {
  ...
  val actorCell: ActorCell = ...
  ...
}

As you see below, ActorCell has a reference to Dispatcher (val dispatcher: MessageDispatcher).

class ActorCell(
  ...
  val dispatcher:  MessageDispatcher,
  ...
  ) extends ...
  ...
  with dungeon.Dispatch {
    ...
  }

So when you do actorRef ! "hello", that actorRef (whose type is LocalActorRef) already knows what Dispatcher to use via ActorCell.

Also ActorCell extends Dispatch trait and it has a refence to Mailbox, so LocalActorRef also knows which Mailbox to send the massage, via ActorCell.

trait Dispatch { this: ActorCell ⇒
  ...
  def mailbox: Mailbox = ...
  ...
}

This couping of LocalActorRef, ActorCell, and Mailbox is what I meant by Dispatcher doesn’t have routing capabilities in a “NOTE” earlier.

sender

Dispatcher’s dispatch method is as follows:

def dispatch(
  receiver: ActorCell,
  invocation: Envelope
): Unit = {
  val mbox = receiver.mailbox
  mbox.enqueue(receiver.self, invocation)
  registerForExecution(mbox, true, false)
}

where registerForExecution is:

def registerForExecution(mbox: Mailbox, ...): Boolean = {
  ...
  executorService execute mbox
  ...
}

In the above code, Dispatcher’s excutorService is executing mbox: Mailbox, because Mailbox extends ForkJoinTask, which can be execute-d by ExecutorService.

fork-join-1

fork-join-2

abstract class Mailbox(val messageQueue: MessageQueue)
  extends ForkJoinTask[Unit] 
  with SystemMessageQueue 
  with Runnable {
    ...
}

Execution (i.e. processing) of Mailbox is run on a different Thread, which was covered in Local Actor workflow part 2 - Receiver side

Dispatcher and receiver-side behavior

fork-join-3

When run method of ForkJoinTask is executed, the following method of Mailbox is called,

@tailrec private final def processMailbox(
  ...
  // def dequeue(): Envelope = messageQueue.dequeue()
  val next = dequeue() 
  ...
  actor invoke next
  ...
  processMailbox(...)
}

it picks up a message from the message queue, and process it.

receiver

So this processMailbox method, called from ForkJoinTask’s run is what invokes your receive method you defined in your Actor.

class MyActor extends Actor {
  def receive = {
    ...  
  }  
}

Richard Imaoka

Written by Richard Imaoka, a Scala developer . You can find me at twitter, and github.