Concurrent computing is interesting, and difficult. I still remember the excitement when I first encountered the dining philosophers problem, and then realized that it is just the starting point of a vast literature. Both the practical and theoretical sides are interesting, and problems range from low-level primitives to higher level abstractions. In this post, I briefly discuss the actor model, and my DIY implementation. Of course, we should not reinvent the wheel, just use some good wheels out there. However, sometimes it is good to know how to construct one, after all, concurrency is not so simple a wheel. If you already know the actor model, but somehow you are currious about a simple implementation, you can skip the post, and just take a look at my project conature.
A simple search should give us the Wikipedia article on the Actor model. An actor is an object, with an address and an internal state/storage. An actor only acts upon receiving a message:
Message sending is asynchronous with unbounded delay, i.e. message will eventually arrives at its destination actor. We will not concern with the complicated formal treatment. From a practical perspective, the model intuitively captures the real distributed systems. And that lead us to considering some implementations.
Most (all?) of the implementations do not conform to the theoretical model, the same way computer programs do not conform to the Turing machine. Erlang processes are very close to actors, with process state isolation, concurrent execution and location transparency, all are built-in with Erlang syntax and runtime. Other implementations are at library level, and are restricted by the hosting languages. AFAIK, the most successful library is Akka. If you are on the JVM, or you love Scala, or you use Java, you should take a look.
Because it is interesting. Beside being a consumer of libraries and frameworks, which is essential to have the job done, I would like to understand some pieces under the hood. Obviously, bloating 3rd dependencies is the norm today. The exception is some big companies where everything is developed in house. This project is my attempt to learn a few basic ideas from many good ones underlying Akka and Netty.
Actually, Scala and Java are already some big dependencies. Consider the scenario if I attempt the project in C++: serialization with boost or protobuf, cross-platform asynchronous IO with libevent or asio. Solving any of those problems in a DIY style would be a full project on its own.
Actors are indeed quite easy to setup in Scala, especially if we just start with non-typed one, and not to put in fancy supervision or complex life-cycle management. The main exercise is to write multi-threading code and try to be lock-free for better performance. A good resource is 1024cores. The next step is typed actor, which is also doable. The good starting point is scalaz-actor, small and easy to understand.
While the theoretical actor model does not care about timeout, we should have this feature in
practice, otherwise our asynchronous system is close to useless. For this task, the JDK provides
a highly precise java.util.Timer
, however, for better performance, there is an
approximate timer as describe in the paper “Hashed and hierarchical timing wheels” (Varghese,
George, and Lauck).
The network layer is built on Java NIO Selector, TCP only, with default JVM Serialization. I do not follow Netty design of event loop group, and inbound/outbound event listener. The network server is a single thread managing all IO events: connect, read, write, and timeout, then callback into actors for concurrent execution. All connections are closed on read timeout, a decision made after reading some Netty issues. The connections also support two-way communication, which is enabled by a default configuration.
Even a DIY project needs a good logging. That’s why I investigated the Disruptor. In the end,
I think a simple actor-based event queue is fine enough. Then combine good old java.util.
logging
with some scala macro to generate all the if/else, we may have a descent logger.
It would take a very long post to describe the full implementation, for now, let me skip to some code examples, taken from the test suite.
The first example is a pair of Ping-Pong actors. Notice that actors are asynchronous, thus testing
must include some wait strategy. One solution is java.util.concurrent.CountDownLatch
used in
project tests. The snippet below just employs Thread.sleep()
.
import scala.concurrent.duration.{ Duration }
import np.conature.actor.{ Behavior, ActorContext, Actor }
case class PingPongMsg(sender: Actor[PingPongMsg], x: Int)
class PingPongBehavior(val limit: Int, val latch: CountDownLatch) extends Behavior[PingPongMsg] {
var i = 0
override def apply(msg: PingPongMsg) = {
i = msg.x
if (msg.x < limit)
msg.sender ! PingPongMsg(selfref, msg.x + 1)
Behavior.same
}
setTimeout(Duration("500ms")) { println(s"$selfref actor is done!"); selfref.terminate() }
}
val context = ActorContext.createDefault()
val a = context.spawn(new PingPongBehavior(4))
val b = context.spawn(new PingPongBehavior(4))
a ! PingPongMsg(b, 0)
Thread.sleep(1000)
context.stop()
We think of actors as state machines with input messages. State is defined by Behavior[_]
,
and Actor
is created with a starting state/behavior. The ActorContext
hosts the execution
pool, scheduler, and network support. To keep the design simple, conature does not handle naming
of local actor. However network actor has alias uri.
The ping-pong problem: two actor are setups and will echo messages, up to some preset limit, then
stops sending. The actors are terminated on receive timeout. We need only one message type, PingPongMsg
,
and one state PingPongBehavior extends Behavior[PingPongMsg]
. When we extend Behavior
, the
only requirement is to override apply()
. We can optionally setup extra internal state, i.e. the
variable var x
, and the optional timeout. The Behavior
traits support timeout settings, which
can be invoked at initialization or during message processing.
The second example is the so-called ask pattern, which is quite important. Recall that actors can only send messages to other actors. From the outside, we can send requests to actor, but we cannot get the response, unless we are also an actor. Thus, we have to embed the return variable into the actor universe, i.e. create an actor that close over the intended return variable, send message to the actor to kick off the processing, and check/wait for the variable to be updated. That workflow is captured by a general Promise-Future pattern in asynchronous programming.
case class Request(x: Int, repTo: Actor[Reply])
case class Reply(x: Int)
class RequestHandler extends Behavior[Request] {
def apply(msg: Request) = {
msg.repTo ! Reply(2 * msg.x)
Behavior.same
}
}
val context = ActorContext.createDefault()
val actor = context.spawn(new RequestHandler)
val fut: Future[Reply] = context.ask(actor, Request(3, _))
assert(Await.result(fut, Duration.Inf).x == 6)
context.stop()
The ask scenario is easier to understand in the non-typed actor, assuming that we already know the
Promise-Future pattern. Since we want types, we
have to deal with some awkwardness. The main actor has a Behavior[Request]
, thus it will
only receive Request
message. The reply is sent to the Promise actor to complete the Future
value, but that actor is also typed, we must decide all the types upfront.
There is no way for the framework to know what types are used, thus we have to create the correct
request: the context will spawn the Promise actor, and wrap it into
the Request. So we have the ask pattern, with this ugly code context.ask(actor, Request(3, _))
.
A side note: starting from scalaz-actor, I came to a design similar to Akka Typed, then I
was stuck at the typed ask pattern. The solution above is just a shameless copy from Akka.
However, I am working on another design, which allows beautiful ask:
context.ask(actor, Request(3))
.
I am checking the typing constraint and writing some more tests, hopefully everything is correct,
and I will introduce that in the next post.
The third example is network chat, with 2 clients and one server. The full setup can be found
under systest
sub-project. To run the network test, from sbt console:
systest/multi-jvm:run np.conature.systest.multijvm.TypedChat
and
systest/multi-jvm:run np.conature.systest.multijvm.Chat
. These example also show another problem
with typed actor. Suppose that the server defines ServerMessage
hierarchy. A client
must be able to process SeverMessage
as well as extra messages for client’s logic. Here, I am
really looking forward to Scala 3, where Union type will shine, i.e. ClientBehavior
extends Behavior[ServerMessage | UIInput]
. Without that, we need some glue code with ADT, and some
adaptation, or we can always resort to untyped Behavior[Any]
.
I dont think that this post covers everything I intended to present. Maybe I need a series of posts, or better, a full documentation. That’s a lot of work, and it might not be worth the scope of the project. However, if you are curious about some of the cute ideas mentioned here, you may look at the source code for more details. Currently, it is about 2000 lines of Scala, and 1000 lines of Java. I must say, it is nowhere near Akka or Netty, but it would be easier to understand, and it may help you with the beginning step to learn the code of those real projects.