In previous post we learned about supervision and how to restart a behavior in case of failures. Related to that is the actor’s lifecycle and how to watch an actor to be notified when it’s terminated.
Classic untyped actors have lifecycle hooks; preStart
, preRestart
, postRestart
and postStop
. Some of those call each other and your head is probably starting to spin immediately when you try to recall when each one is invoked. In Akka Typed these are represented as messages and simplified to only two messages; PreRestart
and PostStop
.
preStart
and postRestart
are replaced by placing such startup code in the constructor of the behavior, or more typically in a deferred
behavior. Actor.deferred
is like a factory for a behavior. Creation of the behavior instance is deferred until the actor is started, as opposed to Actor.immutable
that creates the behavior instance immediately before the actor is running.
Obviously, PreRestart
is received when the behavior is restarted and PostStop
when it’s stopped, but it’s worth noting that PostStop
is not signalled when the behavior is restarted.
The typical usage of PreRestart
and PostStop
is to close resources that the actor has been using. For example, the FlakyWorker
from previous post can be expanded to write the jobs to a file, which must be closed when the actor is stopped or restarted.
object FlakyWorker2 {
sealed trait Command
final case class Job(payload: String) extends Command
val workerBehavior: Behavior[Command] = Actor.deferred { ctx =>
ctx.system.log.info("Worker {} is STARTED", ctx.self)
val out = new PrintWriter(new FileWriter(
s"target/out-${ctx.self.path.name}.txt", true))
active(count = 1, out)
}
private def active(count: Int, out: PrintWriter): Behavior[Command] =
Actor.immutable[Command] { (ctx, msg) =>
msg match {
case Job(payload) =>
if (ThreadLocalRandom.current().nextInt(5) == 0)
throw new RuntimeException("Bad luck")
ctx.system.log.info("Worker {} got job {}, count {}", ctx.self, payload, count)
out.println(s"Worker ${ctx.self} got job $payload, count $count")
active(count + 1, out)
}
} onSignal {
case (ctx, PreRestart) =>
ctx.system.log.info("Worker {} is RESTARTED, count {}", ctx.self, count)
out.close()
Actor.same
case (ctx, PostStop) =>
ctx.system.log.info("Worker {} is STOPPED, count {}", ctx.self, count)
out.close()
Actor.same
}
}
Note how the file is opened in deferred
when the actor is started and closed when receiving PreRestart
and PostStop
. I said that those are messages. Well, they are not conforming to the message type of the behavior so they can’t be handled in the ordinary onMessage
function. Instead we call them signals and they are handled in a separate onSignal
function. onSignal
is a partial function, as opposed to the onMessage
that is an ordinary total function, because you probably only want to handle a few of the defined signals. The total set of signal types are defined by Akka and more signal types may be added in future versions. You don’t want match errors because of unhandled signals.
Let’s expand the FlakyWorker
example even further. We would like to have several workers and each one responsible for a “partition” of jobs to spread the load and in this specific case write jobs for different partitions to separate files. For this we introduce a parent manager actor that spawns workers depending on the given partition of an incoming job, and delegates jobs to the right worker.
The manager is supervising the workers and restarting them if they fail, but with some limits. After two failures within 1 second the worker is stopped instead of restarted.
restartWithLimit(maxNrOfRetries = 2, 1.second)
The fact that a worker has been stopped is something that the manager should know about. That is not solved by supervision. Instead it needs to watch
the workers. You already know that concept from untyped actors. It is exactly the same with typed actors. The full manager looks like this:
object WorkerManager {
sealed trait Command
final case class Job(partition: Int, payload: String) extends Command
private val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, 1.second)
private val worker: Behavior[FlakyWorker2.Command] =
Actor.restarter[RuntimeException](strategy).wrap(FlakyWorker2.workerBehavior)
val workerManagerBehavior: Behavior[Command] =
active(Map.empty)
private def spawnWorker(partition: Int, ctx: ActorContext[Command]): ActorRef[FlakyWorker2.Command] = {
val w = ctx.spawn(worker, s"worker-$partition")
ctx.watch(w)
w
}
private def active(workers: Map[Int, ActorRef[FlakyWorker2.Command]]): Behavior[Command] = {
Actor.immutable[Command] { (ctx, msg) =>
msg match {
case job @ Job(partition, payload) =>
val (w, newWorkers) = workers.get(partition) match {
case Some(w) =>
(w, workers)
case None =>
val w = spawnWorker(partition, ctx)
(w, workers.updated(partition, w))
}
w ! FlakyWorker2.Job(payload)
active(newWorkers)
}
} onSignal {
case (ctx, Terminated(ref)) =>
ctx.system.log.info("Worker {} is TERMINATED", ref)
val newWorkers = workers.filterNot { case (_, w) => w == ref }
active(newWorkers)
}
}
}
Note how the worker is watched when it’s spawned and that the Terminated
message is handled in onSignal
. In this example it’s watching a child actor, but watch
can be used on any ActorRef
and not only children.
The above code is using filterNot
because the partition number is not known and that would not be very efficient if there were many entries in the Map
. We could maintain a bidirectional map (using two maps) but it would be better if the terminated signal could carry the partition number. There is a new feature in Akka typed that comes in handy for that. You can define an application specific message instead of Terminated
. Such message can carry additional information such as the partition number in the above example. It is used like this:
private final case class WorkerStopped(partition: Int) extends Command
private def spawnWorker(partition: Int, ctx: ActorContext[Command]): ActorRef[FlakyWorker2.Command] = {
val w = ctx.spawn(worker, s"worker-$partition")
ctx.watchWith(w, WorkerStopped(partition))
w
}
private def active(workers: Map[Int, ActorRef[FlakyWorker2.Command]]): Behavior[Command] = {
Actor.immutable[Command] { (ctx, msg) =>
msg match {
case job @ Job(partition, payload) =>
val (w, newWorkers) = workers.get(partition) match {
case Some(w) =>
(w, workers)
case None =>
val w = spawnWorker(partition, ctx)
(w, workers.updated(partition, w))
}
w ! FlakyWorker2.Job(payload)
active(newWorkers)
case WorkerStopped(partition) =>
ctx.system.log.info("Worker {} is TERMINATED", workers(partition))
active(workers - partition)
}
}
}
A good exercise to fully understand the actor lifecycle concept is to run blog.typed.scaladsl.FlakyWorker2App
in patriknw/akka-typed-blog and inspect the log output and the files it creates.
The full source code of these examples, including corresponding Java examples, are available in patriknw/akka-typed-blog.
This post is part of the "Introducing Akka Typed" series. Explore other posts in this series:
- Akka Typed: Hello World in the new API
- Akka Typed: Coexistence
- Akka Typed: Mutable vs. Immutable
- Akka Typed: Protocols
- Akka Typed: Supervision
- → Akka Typed: Lifecycle and Watch