app/utils/ScheduledAgent.scala (132 lines of code) (raw):
package utils
import org.apache.pekko.actor.{ActorSystem, Cancellable}
import scala.concurrent.duration._
import scala.language.postfixOps
import org.joda.time.{DateTime, Interval, LocalDate, LocalTime}
import org.apache.pekko.agent.Agent
import scala.concurrent.ExecutionContext
object ScheduledAgent extends LifecycleWithoutApp {
var scheduleSystem: Option[ActorSystem] = None
def apply[T](initialDelay: FiniteDuration, frequency: FiniteDuration)(
block: => T
)(implicit ec: ExecutionContext): ScheduledAgent[T] = {
ScheduledAgent(initialDelay, frequency, block)(_ => block)
}
def apply[T](
initialDelay: FiniteDuration,
frequency: FiniteDuration,
initialValue: T
)(block: T => T)(implicit ec: ExecutionContext): ScheduledAgent[T] = {
ScheduledAgent(
initialValue,
PeriodicScheduledAgentUpdate(block, initialDelay, frequency)
)
}
def apply[T](initialValue: T, updates: ScheduledAgentUpdate[T]*)(implicit
ec: ExecutionContext
): ScheduledAgent[T] = {
new ScheduledAgent(scheduleSystem.get, initialValue, updates: _*)
}
def init(): Unit = {
scheduleSystem = Some(ActorSystem("scheduled-agent"))
}
def shutdown(): Unit = {
scheduleSystem.foreach(_.terminate())
}
}
trait ScheduledAgentUpdate[T] {
def block: T => T
}
case class Update[T](block: T => T) extends ScheduledAgentUpdate[T]
object Update {
def apply[T](block: => Unit): Update[T] = {
Update { t =>
block
t
}
}
}
case class PeriodicScheduledAgentUpdate[T](
block: T => T,
initialDelay: FiniteDuration,
frequency: FiniteDuration
) extends ScheduledAgentUpdate[T]
object PeriodicScheduledAgentUpdate {
def apply[T](initialDelay: FiniteDuration, frequency: FiniteDuration)(
block: T => T
): PeriodicScheduledAgentUpdate[T] =
PeriodicScheduledAgentUpdate(block, initialDelay, frequency)
}
case class DailyScheduledAgentUpdate[T](block: T => T, timeOfDay: LocalTime)
extends ScheduledAgentUpdate[T] {
def timeToNextExecution: FiniteDuration = {
val executionToday = new LocalDate().toDateTime(timeOfDay)
val interval =
if (executionToday.isAfterNow)
// today if before the time of day
new Interval(new DateTime(), executionToday)
else {
// tomorrow if after the time of day
val executionTomorrow =
new LocalDate().plusDays(1).toDateTime(timeOfDay)
new Interval(new DateTime(), executionTomorrow)
}
interval.toDurationMillis milliseconds
}
}
object DailyScheduledAgentUpdate {
def apply[T](timeOfDay: LocalTime)(
block: T => T
): DailyScheduledAgentUpdate[T] = DailyScheduledAgentUpdate(block, timeOfDay)
}
class ScheduledAgent[T](
system: ActorSystem,
initialValue: T,
updates: ScheduledAgentUpdate[T]*
)(implicit ec: ExecutionContext)
extends Logging {
val agent: Agent[T] = Agent[T](initialValue)(ec)
val cancellablesAgent: Agent[Map[ScheduledAgentUpdate[T], Cancellable]] =
Agent[Map[ScheduledAgentUpdate[T], Cancellable]] {
updates.map { update =>
val cancellable = update match {
case periodic: PeriodicScheduledAgentUpdate[_] =>
system.scheduler.scheduleAtFixedRate(
periodic.initialDelay,
periodic.frequency
) { () =>
queueUpdate(periodic)
}
case daily: DailyScheduledAgentUpdate[_] =>
scheduleNext(daily.asInstanceOf[DailyScheduledAgentUpdate[T]])
}
update -> cancellable
}.toMap
}(ec)
def scheduleNext(update: DailyScheduledAgentUpdate[T]): Cancellable = {
val delay = update.timeToNextExecution
log.debug("Scheduling %s to run next in %s".format(update, delay))
system.scheduler.scheduleOnce(delay) {
queueUpdate(update)
val newCancellable = scheduleNext(update)
cancellablesAgent.send(_ + (update -> newCancellable))
}
}
def queueUpdate(update: ScheduledAgentUpdate[T]): Unit = {
agent sendOff { lastValue =>
try {
update.block(lastValue)
} catch {
case t: Throwable =>
log.warn("Failed to update on schedule", t)
lastValue
}
}
}
def get(): T = agent()
def apply(): T = get()
def shutdown(): Unit = {
cancellablesAgent.send { cancellables =>
cancellables.values.foreach(_.cancel())
Map.empty
}
}
}