app/model/ASG.scala (111 lines of code) (raw):
package model
import scala.concurrent.Future
import lib.{AWS, AmazonConnection, FutureO}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.jdk.CollectionConverters._
import scala.util.Try
import play.api.Logger
import play.api.libs.json._
import controllers.routes
import com.amazonaws.services.cloudwatch.model.{Datapoint, Dimension, GetMetricStatisticsRequest}
import com.amazonaws.services.cloudwatch.model.Statistic._
import com.amazonaws.services.ec2
import org.joda.time.DateTime
import play.api.libs.ws.WSClient
case class ASG(name: Option[String], stage: Option[String], app: Option[String], stack: Option[String],
elb: Option[ELB], members: Seq[ASGMember], recentActivity: Seq[ScalingAction],
cpu: Seq[Datapoint], suspendedActivities: Seq[String], approxMonthlyCost: Option[BigDecimal],
moreDetailsLink: Option[String], hint: Option[String])
object ASG {
import AWS.Writes._
implicit val writes = Json.writes[ASG]
type ASGHint = PartialFunction[Map[String, String], String]
def tagMatcher(tagName: String)(pf: PartialFunction[String, String]): ASGHint = {
case tags if tags.contains(tagName) && pf.isDefinedAt(tags(tagName)) => pf(tags(tagName))
}
val elasticsearchMaster = tagMatcher("es-config:node.master") {
case "true" => "es-master"
}
val elasticsearchHotWarm = tagMatcher("es-config:node.attr.data") {
case s => "es-" + s
}
val hintProvider: ASGHint = elasticsearchMaster orElse elasticsearchHotWarm
}
class ASGSource(cost: AWSCost) {
val log = Logger(classOf[ASGSource])
def fromApp(instances: List[com.amazonaws.services.ec2.model.Instance])(implicit conn: AmazonConnection, ws: WSClient): Future[Seq[ASG]] = {
val instancesByAutoScalingGroupName: Map[Option[String], Seq[ec2.model.Instance]] =
instances.groupBy(_.getTags.asScala.find(_.getKey == "aws:autoscaling:groupName").map(_.getValue))
Future.traverse(instancesByAutoScalingGroupName.toSeq) {
case (autoScalingGroupNameOpt, instancesOfGroupName) => fromInstancesWithAutoscalingGroupName(autoScalingGroupNameOpt, instancesOfGroupName)
}
}
private def fromInstancesWithAutoscalingGroupName(autoScalingGroupNameOpt: Option[String], instances: Seq[ec2.model.Instance])
(implicit conn: AmazonConnection, ws: WSClient): Future[ASG] = {
val tags = instances.flatMap(i => i.getTags.asScala.toList.map(t => t.getKey -> t.getValue)).toMap
val asgFtO = for {
asgName <- FutureO.toFut(autoScalingGroupNameOpt)
asg <- FutureO(EstateInstances.fetchAsgByName(asgName))
} yield asg
val awsAsgInstances = for {
asg <- asgFtO
} yield asg.getInstances.asScala.toList
val elbOptFt: FutureO[ELB] = for {
asg <- asgFtO
elbName <- FutureO.toFut(asg.getLoadBalancerNames.asScala.headOption)
elb <- FutureO.toOpt(ELB.forName(elbName))
} yield elb
val recentActivity = for {
asg <- asgFtO
actions <- FutureO.toOpt(ScalingAction.forGroup(asg.getAutoScalingGroupName))
} yield actions filter (_.isRecent)
val cpuFtO = for {
asg <- asgFtO
stats <- FutureO.toOpt(AWS.futureOf(conn.cloudWatch.getMetricStatisticsAsync, new GetMetricStatisticsRequest()
.withDimensions(new Dimension().withName("AutoScalingGroupName").withValue(asg.getAutoScalingGroupName))
.withMetricName("CPUUtilization").withNamespace("AWS/EC2").withPeriod(60)
.withStatistics(Maximum, Average)
.withStartTime(DateTime.now().minusHours(3).toDate).withEndTime(DateTime.now().toDate)
))
} yield stats.getDatapoints.asScala.toList.sortBy(_.getTimestamp)
val suspendedProcesses = for {
asg <- asgFtO
processes = asg.getSuspendedProcesses.asScala.toList.map(_.getProcessName).sorted
} yield processes
for {
elbOpt <- elbOptFt.futureOption
membersOfElb = elbOpt.map(_.members).getOrElse(Nil)
membersOfASGOpt <- awsAsgInstances.futureOption
membersOfASG = membersOfASGOpt.getOrElse(Nil)
clusterMembers = Future.sequence(instances.map(i => Instance.from(i, cost).map(i => ASGMember.from(i, membersOfASG.find(_.getInstanceId == i.id), membersOfElb.find(_.id == i.id)))))
members <- clusterMembers
activities <- recentActivity.futureOption
cpu <- cpuFtO.futureOption
susPro <- suspendedProcesses.futureOption
} yield {
val moreDetailsLink = ManagementTag(tags.get("Management")).flatMap { t =>
if (t.format == Some("elasticsearch")) {
autoScalingGroupNameOpt.map(name => (routes.Application.es(name).url))
} else None
}
ASG(
name = autoScalingGroupNameOpt,
stage = tags.get("Stage"),
app = tags.get("App") orElse tags.get("Role"),
stack = tags.get("Stack"),
elb = elbOpt,
members = members.sortBy(_.instance.availabilityZone),
recentActivity = activities.getOrElse(Nil),
cpu = cpu.getOrElse(Nil),
suspendedActivities = susPro.getOrElse(Nil),
approxMonthlyCost = Try(members.flatMap(_.instance.approxMonthlyCost).sum).toOption,
moreDetailsLink = moreDetailsLink,
hint = ASG.hintProvider.lift(tags)
)
}
}
}
object FutureOption {
def apply[T](of: Option[Future[T]]): Future[Option[T]] =
of.map(f => f.map(Some(_))) getOrElse (Future.successful(None))
}