app/services/PrismData.scala (119 lines of code) (raw):
package services
import org.apache.pekko.actor.{Cancellable, Scheduler}
import models.AmiId
import org.joda.time.DateTime
import play.api.inject.ApplicationLifecycle
import play.api.{Environment, Mode}
import prism.Prism
import prism.Prism.{
AWSAccount,
Image,
Instance,
LaunchConfiguration,
LaunchTemplate
}
import java.util.concurrent.atomic.AtomicReference
import scala.collection.{MapView, SeqLike, SeqOps}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
object PrismData {
val MAX_AGE: Long = 15 * 60 * 1000
trait Failure
case object NotInitialised extends Failure
type CacheData[T] = Either[Failure, (T, DateTime)]
def dataToResult[T](data: CacheData[T], now: DateTime)(implicit
exec: ExecutionContext
): T = data match {
case Left(NotInitialised) =>
throw new IllegalStateException(
s"AMIgo internal data cache is not yet populated"
)
case Left(_) =>
throw new IllegalStateException(s"CacheData failed for unknown reason")
case Right((_, staleTimeStamp))
if (now.getMillis - staleTimeStamp.getMillis) > MAX_AGE =>
throw new IllegalStateException(
s"AMIgo internal data cache is stale - last update at $staleTimeStamp"
)
case Right((t, _)) => t
}
}
class PrismData(
prism: Prism,
lifecycle: ApplicationLifecycle,
scheduler: Scheduler,
environment: Environment
)(implicit exec: ExecutionContext)
extends Loggable {
import PrismData._
private val instancesAgent: AtomicReference[CacheData[Seq[Instance]]] =
new AtomicReference(Left(NotInitialised))
private val launchConfigurationsAgent
: AtomicReference[CacheData[Seq[LaunchConfiguration]]] =
new AtomicReference(Left(NotInitialised))
private val launchTemplatesAgent
: AtomicReference[CacheData[Seq[LaunchTemplate]]] =
new AtomicReference(Left(NotInitialised))
private val copiedImagesAgent
: AtomicReference[CacheData[Map[AmiId, Seq[Image]]]] =
new AtomicReference(Left(NotInitialised))
private val accountsAgent: AtomicReference[CacheData[Seq[AWSAccount]]] =
new AtomicReference(Left(NotInitialised))
val baseUrl: String = prism.baseUrl
def allInstances: Seq[Instance] =
dataToResult(instancesAgent.get, DateTime.now)
def allLaunchConfigurations: Seq[LaunchConfiguration] =
dataToResult(launchConfigurationsAgent.get, DateTime.now)
def allLaunchTemplates: Seq[LaunchTemplate] =
dataToResult(launchTemplatesAgent.get, DateTime.now)
def copiedImages(sourceAmiIds: Set[AmiId]): Map[AmiId, Seq[Image]] =
dataToResult(copiedImagesAgent.get, DateTime.now).view
.filterKeys(sourceAmiIds.contains)
.toMap
def accounts: Seq[AWSAccount] = dataToResult(accountsAgent.get, DateTime.now)
if (environment.mode != Mode.Test) {
val prismDataSchedule: Cancellable =
scheduler.scheduleWithFixedDelay(0.seconds, 1.minutes) { () =>
{
log.debug(s"Refreshing Prism data")
refresh(prism.findAllInstances(), instancesAgent, "instances")(
identity
)
refresh(
prism.findAllLaunchConfigurations(),
launchConfigurationsAgent,
"launch configuration"
)(identity)
refresh(
prism.findAllLaunchTemplates(),
launchTemplatesAgent,
"launch template"
)(identity)
refresh(prism.findCopiedImages(), copiedImagesAgent, "copied image")(
_.groupBy(_.copiedFromAMI)
)
refresh(prism.findAllAWSAccounts(), accountsAgent, "aws accounts")(
identity
)
}
}
lifecycle.addStopHook { () =>
prismDataSchedule.cancel()
Future.successful(())
}
}
private def refresh[T <: SeqOps[_, Seq, _], R](
source: => Future[T],
reference: AtomicReference[CacheData[R]],
name: String
)(transform: T => R): Future[Unit] = {
source
.map { sourceData =>
log.debug(s"Prism: Loaded ${sourceData.length} $name")
reference.set(Right(transform(sourceData) -> DateTime.now))
}
.recover { case t =>
log.warn(s"Prism: Failed to update $name: ${t.getLocalizedMessage}")
}
}
}