app/model/Estate.scala (108 lines of code) (raw):

package model import org.apache.pekko.actor.ActorSystem import com.amazonaws.services.autoscaling.model.{AutoScalingGroup, DescribeAutoScalingGroupsRequest} import com.amazonaws.services.ec2.model.{DescribeInstancesRequest, Instance => AwsEc2Instance} import com.amazonaws.services.sqs.model.{ListQueuesRequest, ListQueuesResult} import lib.{AWS, ScheduledAgent} import org.joda.time.DateTime import play.api.Logger import play.api.libs.json.{Json, Writes} import play.api.libs.ws.WSClient import scala.jdk.CollectionConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NonFatal trait Estate extends Map[String, Stage] { def populated: Boolean def stageNames: Iterable[String] def queues: Seq[Queue] def removed(key: String) = throw new UnsupportedOperationException def updated[B1 >: Stage](key: String, value: B1) = throw new UnsupportedOperationException def asgs: Seq[ASG] = values.flatMap(_.asgs).toSeq def lastUpdated: Option[DateTime] } case class Stack(name: String, asgs: Seq[ASG]) object Stack { implicit val writes = Json.writes[Stack] } case class Stage(stacks: Seq[Stack]) { def asgs: Seq[ASG] = stacks.flatMap(_.asgs) } object Stage { implicit val writes = new Writes[Stage] { def writes(stage: Stage) = Json.toJson(Map(stage.stacks.map(s => s.name -> s): _ *)) } } case class PopulatedEstate(override val asgs: Seq[ASG], queues: Seq[Queue], lastUpdateTime: DateTime) extends Estate { lazy val stacks = asgs.groupBy(asg => (asg.stage.getOrElse("unknown"), asg.stack.getOrElse("unknown")) ).toSeq.map { case ((stage, name), asgs) => stage -> Stack(name, asgs) } lazy val stages = stacks.foldLeft(Map.empty[String, Seq[Stack]].withDefaultValue(Seq[Stack]())){ case (map, (stage, stack)) => map.updated(stage, map(stage) :+ stack) }.view.mapValues (stacks => Stage(stacks.sortBy(- _.asgs.flatMap(_.members).length))) def get(key: String) = stages.get(key) def iterator = stages.iterator lazy val stageNames = stages.keys.toSeq.sorted.sortWith((a, _) => if (a == "PROD") true else false) def populated = true def lastUpdated = Some(lastUpdateTime) } case object PendingEstate extends Estate { def get(key: String) = None def iterator = Nil.iterator def stageNames = Nil def populated = false def queues = Nil def lastUpdated = None } class EstateProvider(asgSource: ASGSource)(implicit wsClient: WSClient, actorSystem: ActorSystem) { val log = Logger(classOf[EstateProvider]) implicit val conn = AWS.connection val estateAgent = ScheduledAgent[Estate](0.seconds, 30.seconds, PendingEstate) { val instancesFuture: Future[List[AwsEc2Instance]] = EstateInstances.fetchAllInstances() val queuesFuture: Future[ListQueuesResult] = AWS.futureOf[ListQueuesRequest,ListQueuesResult](conn.sqs.listQueuesAsync, new ListQueuesRequest()) for { instances <- instancesFuture nonTerminatedInstances = instances.filterNot(i => i.getState.getName=="terminated") tagsToInstances = EstateInstances.groupInstancesByTag(nonTerminatedInstances) asgs <- Future.traverse(tagsToInstances.iterator){ case (_, i) => asgSource.fromApp(i) } queueResult <- queuesFuture.recover { case NonFatal(e) => { log.logger.error("Error retrieving queues", e) new ListQueuesResult().withQueueUrls(Seq(s"/ERROR ${e.getMessage}").asJava) } } queues <- Future.traverse(queueResult.getQueueUrls.asScala.toSeq)(Queue.from) } yield PopulatedEstate(asgs.toList.flatten, queues, DateTime.now) } def apply(): Estate = estateAgent() } object EstateInstances { val log = Logger(classOf[Estate]) implicit val conn = AWS.connection def fetchAsgByName(name: String): Future[Option[AutoScalingGroup]] = { val request = new DescribeAutoScalingGroupsRequest().withAutoScalingGroupNames(name) AWS.futureOf(conn.autoscaling.describeAutoScalingGroupsAsync, request).map { result => result.getAutoScalingGroups.asScala.headOption } } def fetchAllInstances(nextToken: Option[String] = None): Future[List[com.amazonaws.services.ec2.model.Instance]] = { val request = new DescribeInstancesRequest nextToken.foreach(request.setNextToken) AWS.futureOf(conn.ec2.describeInstancesAsync, request).flatMap { result => val instances = result.getReservations().asScala.toList.flatMap(r => r.getInstances().asScala) Option(result.getNextToken()) match { case None => Future.successful(instances) case token: Some[String] => fetchAllInstances(token).map(_ ++ instances) } } } def groupInstancesByTag(instances: List[AwsEc2Instance]): Map[Map[String, String], List[AwsEc2Instance]] = { instances.groupBy(i => { i.getTags.asScala.toList.filter(t => t.getKey == "App" || t.getKey == "Stage" || t.getKey == "Stack") .map(t => t.getKey -> t.getValue).toMap }) } }