backend/app/utils/WorkerControl.scala (178 lines of code) (raw):

package utils import java.time.Instant import java.util.Date import org.apache.pekko.actor.{ActorSystem, Cancellable, Scheduler} import org.apache.pekko.cluster.{Cluster, MemberStatus} import com.amazonaws.services.autoscaling.AmazonAutoScalingClientBuilder import com.amazonaws.services.autoscaling.model.{AutoScalingGroup, DescribeAutoScalingGroupsRequest, DescribeScalingActivitiesRequest, SetDesiredCapacityRequest} import com.amazonaws.services.ec2.AmazonEC2ClientBuilder import com.amazonaws.util.EC2MetadataUtils import services.manifest.WorkerManifest import services.{AWSDiscoveryConfig, IngestStorage, WorkerConfig} import utils.AWSWorkerControl.{AddNewWorker, RemoveWorker} import utils.attempt.{Attempt, Failure, IllegalStateFailure} import scala.jdk.CollectionConverters._ import scala.concurrent.{ExecutionContext, Future} case class WorkerDetails(nodes: Set[String], thisNode: String) trait WorkerControl { def getWorkerDetails(implicit ec: ExecutionContext): Attempt[WorkerDetails] def start(scheduler: Scheduler)(implicit ec: ExecutionContext): Unit def stop(): Future[Unit] } class PekkoWorkerControl(actorSystem: ActorSystem) extends WorkerControl { private val cluster = Cluster(actorSystem) override def getWorkerDetails(implicit ec: ExecutionContext): Attempt[WorkerDetails] = Attempt.catchNonFatalBlasé { val state = cluster.state val members = state.members.filter(_.status == MemberStatus.Up).map(_.uniqueAddress.toString) WorkerDetails(members, cluster.selfUniqueAddress.toString) } // We don't manually spin up and down the Pekko cluster, it's done for us override def start(scheduler: Scheduler)(implicit ec: ExecutionContext): Unit = {} override def stop(): Future[Unit] = Future.successful(()) } class AWSWorkerControl(config: WorkerConfig, discoveryConfig: AWSDiscoveryConfig, ingestStorage: IngestStorage, manifest: WorkerManifest) extends WorkerControl with Logging { val credentials = AwsCredentials() val ec2 = AmazonEC2ClientBuilder.standard().withCredentials(credentials).withRegion(discoveryConfig.region).build() val autoscaling = AmazonAutoScalingClientBuilder.standard().withCredentials(credentials).withRegion(discoveryConfig.region).build() var timerHandler: Option[Cancellable] = None def getWorkerDetails(implicit ec: ExecutionContext): Attempt[WorkerDetails] = for { myInstanceId <- Attempt.catchNonFatalBlasé { EC2MetadataUtils.getInstanceId } instances <- Attempt.catchNonFatalBlasé { AwsDiscovery.findRunningInstances(discoveryConfig.stack, app = "pfi-worker", discoveryConfig.stage, ec2) } } yield { WorkerDetails(instances.map(_.getInstanceId).toSet, myInstanceId) } override def start(scheduler: Scheduler)(implicit ec: ExecutionContext): Unit = { discoveryConfig.workerAutoScalingGroupName match { case Some(workerAutoScalingGroupName) => scheduler.scheduleWithFixedDelay(config.controlInterval, config.controlInterval)(() => { // Only run the check on the oldest instance to get as close as we running the checks as a "singleton" if(runningOnOldestInstance()) { if(AwsDiscovery.isRiffRaffDeployRunning(discoveryConfig.stack, discoveryConfig.stage, ec2)) { logger.info("AWSWorkerControl - not running check as Riff-Raff deploy is running (instances are running and tagged as Magenta:Terminate)") } else { scaleUpOrDownIfNeeded(workerAutoScalingGroupName) breakLocksOnTerminatedWorkers() } } else { logger.info("AWSWorkerControl - not running check as we are not running on the oldest instance") } }) case None => logger.warn("Missing aws.workerAutoScalingGroupName setting, cannot automatically update workers") } } override def stop(): Future[Unit] = { timerHandler.foreach(_.cancel()) Future.successful(()) } private def runningOnOldestInstance()(implicit ec: ExecutionContext): Boolean = { val myInstanceId = EC2MetadataUtils.getInstanceId val otherInstances = AwsDiscovery.findRunningInstances(discoveryConfig.stack, app = "pfi", discoveryConfig.stage, ec2) val oldestInstance = otherInstances.toList.sortBy(_.getLaunchTime).headOption.map(_.getInstanceId) oldestInstance.isEmpty || oldestInstance.contains(myInstanceId) } private def scaleUpOrDownIfNeeded(workerAutoScalingGroupName: String)(implicit ec: ExecutionContext): Unit = { getCurrentState(workerAutoScalingGroupName).flatMap { state => val operation = AWSWorkerControl.decideOperation(state, Instant.now().toEpochMilli, config.controlCooldown.toMillis) logger.info(s"AWSWorkerControl desiredNumberOfWorkers: ${state.desiredNumberOfWorkers}, inProgress: ${state.inProgress}, outstandingFromIngestStore: ${state.outstandingFromIngestStore}, outstandingFromTodos: ${state.outstandingFromTodos} lastEventTime: ${new Date(state.lastEventTime)}, minimumNumberOfWorkers: ${state.minimumNumberOfWorkers}, maximumNumberOfWorkers: ${state.maximumNumberOfWorkers}, operation: $operation") operation match { case Some(AddNewWorker) if state.desiredNumberOfWorkers < state.maximumNumberOfWorkers => setNumberOfWorkers(state.desiredNumberOfWorkers + 1, workerAutoScalingGroupName) case Some(RemoveWorker) if state.desiredNumberOfWorkers > state.minimumNumberOfWorkers => setNumberOfWorkers(state.desiredNumberOfWorkers - 1, workerAutoScalingGroupName) case _ => Attempt.Right(()) } }.recoverWith { case f: Failure => logger.warn(s"Worker control update failed. Retry in ${config.controlInterval.toSeconds} seconds", f.toThrowable) Attempt.Right(()) } } private def getCurrentState(workerAutoScalingGroupName: String)(implicit ec: ExecutionContext): Attempt[AWSWorkerControl.State] = { for { workerAutoScalingGroup <- getAutoScalingGroup(workerAutoScalingGroupName) desiredNumberOfWorkers = workerAutoScalingGroup.getDesiredCapacity.intValue() minimumNumberOfWorkers = workerAutoScalingGroup.getMinSize.intValue() // This is not the same as the max in the auto-scaling group as we // need to maintain headroom to accommodate new workers during a // deploy even if we're scaled up due to the number of tasks maximumNumberOfWorkers = Math.max(workerAutoScalingGroup.getMinSize, workerAutoScalingGroup.getMaxSize / 2) lastEventTime <- getLastEventTime(workerAutoScalingGroup.getAutoScalingGroupName) extractorWorkCounts <- Attempt.fromEither(manifest.getWorkCounts()) filesLeftInS3UploadBucket <- Attempt.fromEither(ingestStorage.list) } yield { AWSWorkerControl.State(desiredNumberOfWorkers, extractorWorkCounts.inProgress, filesLeftInS3UploadBucket.size, extractorWorkCounts.outstanding, lastEventTime, minimumNumberOfWorkers, maximumNumberOfWorkers) } } private def getAutoScalingGroup(workerAutoScalingGroupName: String)(implicit ec: ExecutionContext): Attempt[AutoScalingGroup] = { Attempt.catchNonFatalBlasé { val request = new DescribeAutoScalingGroupsRequest() .withAutoScalingGroupNames(workerAutoScalingGroupName) autoscaling.describeAutoScalingGroups(request) }.flatMap { response => response.getAutoScalingGroups.asScala.headOption match { case Some(asg) => Attempt.Right(asg) case None => Attempt.Left(IllegalStateFailure(s"Could not find worker auto-scaling group $workerAutoScalingGroupName")) } } } private def getLastEventTime(workerAutoScalingGroupName: String)(implicit ec: ExecutionContext): Attempt[Long] = { Attempt.catchNonFatalBlasé { val request = new DescribeScalingActivitiesRequest() .withAutoScalingGroupName(workerAutoScalingGroupName) autoscaling.describeScalingActivities(request) }.flatMap { response => // events are returned with the latest first response.getActivities.asScala.headOption match { case Some(event) => Attempt.Right(event.getStartTime.getTime) case None => Attempt.Left(IllegalStateFailure(s"No events on autoscaling group $workerAutoScalingGroupName")) } } } private def setNumberOfWorkers(numberOfWorkers: Int, workerAutoScalingGroupName: String): Attempt[Unit] = { Attempt.catchNonFatalBlasé { logger.info(s"Worker control updating number of workers in $workerAutoScalingGroupName to $numberOfWorkers") val request = new SetDesiredCapacityRequest() .withAutoScalingGroupName(workerAutoScalingGroupName) .withDesiredCapacity(numberOfWorkers) autoscaling.setDesiredCapacity(request) } } private def breakLocksOnTerminatedWorkers(): Attempt[Unit] = { Attempt.catchNonFatalBlasé { val runningWorkers = AwsDiscovery.findRunningInstances(discoveryConfig.stack, app = "pfi-worker", discoveryConfig.stage, ec2) val runningInstanceIds = runningWorkers.map(_.getInstanceId).toList logger.info(s"Breaking locks for any worker not running. Running: [${runningInstanceIds.mkString(", ")}]") manifest.releaseLocksForTerminatedWorkers(runningInstanceIds) } } } object AWSWorkerControl { case class State( desiredNumberOfWorkers: Int, inProgress: Int, outstandingFromIngestStore: Int, outstandingFromTodos: Int, lastEventTime: Long, minimumNumberOfWorkers: Int, maximumNumberOfWorkers: Int ) sealed trait Operation case object AddNewWorker extends Operation case object RemoveWorker extends Operation def decideOperation(state: State, now: Long, cooldown: Long): Option[Operation] = { val inCooldown = state.lastEventTime > (now - cooldown) val manuallyScaledDown = state.desiredNumberOfWorkers == 0 val outstandingInTotal = state.outstandingFromIngestStore + state.outstandingFromTodos if(inCooldown || manuallyScaledDown) { None } else if(outstandingInTotal > 0 && state.desiredNumberOfWorkers < state.maximumNumberOfWorkers) { Some(AddNewWorker) } else if(outstandingInTotal == 0 && state.inProgress == 0 && state.desiredNumberOfWorkers > state.minimumNumberOfWorkers) { Some(RemoveWorker) } else { None } } }