app/event/Behaviours.scala (70 lines of code) (raw):
package event
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import data.{BakeLogs, Bakes, Dynamo}
import event.BakeEvent._
import models.{AmiId, Bake, BakeStatus, NotificationConfig}
import services.Loggable
import scala.concurrent.ExecutionContext
object Behaviours extends Loggable {
/** Initialises the child actors for each event listener and then switches
* behaviour to `broadcastEvents`
*/
def guardian(
behaviours: Map[String, Behavior[BakeEvent]]
): Behavior[BakeEvent] = Behaviors.setup { context =>
val eventListeners = behaviours.map { case (name, behavior) =>
context.spawn(automaticallyRestart(behavior), name)
}
broadcastEvents(eventListeners)
}
/** Broadcasts all incoming events to all event listeners
*/
def broadcastEvents(
eventListeners: Iterable[ActorRef[BakeEvent]]
): Behavior[BakeEvent] = Behaviors.receiveMessage[BakeEvent] { bakeEvent =>
for (listener <- eventListeners)
listener ! bakeEvent
Behaviors.same
}
/** Ensures that child actors are restarted after encountering an exception
* For more details, see:
* https://doc.akka.io/docs/akka/2.5/typed/fault-tolerance.html
*/
def automaticallyRestart(behavior: Behavior[BakeEvent]): Behavior[BakeEvent] =
Behaviors.supervise(behavior).onFailure(SupervisorStrategy.restart)
/** Forwards all Packer-related logging to Amigo's application logs
*/
val writeToLog: Behavior[BakeEvent] = Behaviors.receiveMessage[BakeEvent] {
message =>
message match {
case Log(_, line) => log.info(s"PACKER: $line")
case AmiCreated(_, amiId) =>
log.info(s"Packer created an AMI! AMI id = ${amiId.value}")
case PackerProcessExited(_, exitCode) =>
log.info(s"Packer process completed with exit code $exitCode")
}
Behaviors.same
}
def sendAmiCreatedNotification(
amiCreated: (Bake, AmiId) => Unit
)(implicit dynamo: Dynamo, ec: ExecutionContext): Behavior[BakeEvent] = {
Behaviors.receiveMessage[BakeEvent] { message =>
message match {
case AmiCreated(bakeId, amiId) =>
log.info("Received an AMI created event")
for {
bake <- Bakes.findById(bakeId.recipeId, bakeId.buildNumber)
} {
log.info(s"Notifying that $amiId exists")
amiCreated(bake, amiId)
}
case _ => // discard
}
Behaviors.same
}
}
/** Writes updates to the appropriate Dynamo records and triggers bake failed
* notifications
*/
def persistBakeEvent(
notificationConfig: Option[NotificationConfig]
)(implicit dynamo: Dynamo, exec: ExecutionContext): Behavior[BakeEvent] =
Behaviors.receiveMessage[BakeEvent] { message =>
message match {
case Log(_, bakeLog) => BakeLogs.save(bakeLog)
case AmiCreated(bakeId, amiId) => Bakes.updateAmiId(bakeId, amiId)
case PackerProcessExited(bakeId, exitCode) =>
val status =
if (exitCode == 0) BakeStatus.Complete else BakeStatus.Failed
Bake.updateStatusAndNotifyFailure(bakeId, status, notificationConfig)
}
Behaviors.same
}
}