app/controllers/ProxyFrameworkAdminController.scala (215 lines of code) (raw):

package controllers import java.time.{Instant, ZoneId, ZonedDateTime} import java.util.Date import akka.actor.{ActorRef, ActorSystem} import auth.{BearerTokenAuth, Security} import com.amazonaws.auth.profile.ProfileCredentialsProvider import com.amazonaws.auth.{AWSCredentialsProviderChain, ContainerCredentialsProvider, InstanceProfileCredentialsProvider} import com.amazonaws.internal.CredentialsEndpointProvider import com.amazonaws.regions.{Region, Regions} import com.amazonaws.services.cloudformation.AmazonCloudFormationClientBuilder import com.amazonaws.services.cloudformation.model._ import com.theguardian.multimedia.archivehunter.common.cmn_models.{ProxyFrameworkInstance, ProxyFrameworkInstanceDAO} import javax.inject.{Inject, Singleton} import play.api.{Configuration, Logger} import play.api.libs.circe.Circe import play.api.mvc.{AbstractController, ControllerComponents} import io.circe.syntax._ import io.circe.generic.auto._ import org.slf4j.LoggerFactory import play.api.cache.SyncCacheApi import requests.AddPFDeploymentRequest import responses.{GenericErrorResponse, MultiResultResponse, ObjectListResponse, ProxyFrameworkDeploymentInfo} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @Singleton class ProxyFrameworkAdminController @Inject() (override val config:Configuration, override val controllerComponents:ControllerComponents, override val bearerTokenAuth:BearerTokenAuth, override val cache:SyncCacheApi, proxyFrameworkInstanceDAO: ProxyFrameworkInstanceDAO, proxyFrameworkHelper: helpers.ProxyFramework) (implicit actorSystem:ActorSystem) extends AbstractController(controllerComponents) with Circe with Security { override protected val logger=LoggerFactory.getLogger(getClass) implicit val ec:ExecutionContext = controllerComponents.executionContext private val awsProfile = config.getOptional[String]("externalData.awsProfile") def existingDeployments = IsAdminAsync { _=> request=> proxyFrameworkInstanceDAO.allRecords.map(result=>{ val failures = result.collect({case Left(err)=>err}) if(failures.nonEmpty){ InternalServerError(GenericErrorResponse("db_error",failures.head.toString).asJson) } else { val records = result.collect({case Right(record)=>record}) Ok(ObjectListResponse("ok","ProxyFrameworkInstance",records,records.length).asJson) } }) } def credentialsProvider(profileName:Option[String]=None) = new AWSCredentialsProviderChain( new ProfileCredentialsProvider(profileName.getOrElse("default")), InstanceProfileCredentialsProvider.getInstance() ) protected def getCfClient(region:String) = AmazonCloudFormationClientBuilder.standard() .withCredentials(credentialsProvider(awsProfile)) .withRegion(region).build() /** * (asynchronously) scan cloudformation within the given region for deployments of the proxy framework * @param region String of the region to search * @return a Future containing a Sequence of StackSummary objects along with the region name */ def scanRegionForDeployments(region:String):Future[(String,Try[Seq[StackSummary]])] = Future { val searchParam = config.getOptional[String]("proxyFramework.descriptionSearch").getOrElse("Proxying framework for ArchiveHunter") val cfClient = getCfClient(region) logger.info(s"Looking for deployments in region $region based on a template description of '$searchParam'") /** * recursively get stack information from CF * @param rq ListStacksRequest instance * @param continuationToken Optional continuation token, for recursion. Don't specify when calling * @param currentValues Initial set of StackSummary. Don't specify when calling * @return a Seq of StackSummaries containing info about relevant stacks */ def getNextPage(rq:ListStacksRequest, continuationToken:Option[String]=None, currentValues:Seq[StackSummary]=Seq()):Seq[StackSummary] = { val finalRq = continuationToken match { case Some(tok)=>rq.withNextToken(tok) case None=>rq } logger.debug(s"getting next page of results, continuation token is $continuationToken") val result = cfClient.listStacks(finalRq) logger.debug(s"results page: ${result.getStackSummaries.asScala}") Option(result.getNextToken) match { case Some(tok)=> logger.debug(s"Got continuation token $tok, recursing...") getNextPage(rq, Some(tok), currentValues ++ result.getStackSummaries.asScala.filter(_.getTemplateDescription.startsWith(searchParam))) case None=> logger.debug(s"No continuation token, reached end of results.") val data = Option(result.getStackSummaries) match { case Some(list) => list.asScala.filter(summ=> Option(summ.getTemplateDescription).isDefined && summ.getTemplateDescription.startsWith(searchParam) ) case None => Seq() } currentValues ++ data } } logger.debug("Looking for stacks with CREATE_COMPLETE,UPDATE_COMPLETE or UPDATE_IN_PROGRESS") try { val baseRq = new ListStacksRequest().withStackStatusFilters(Seq("CREATE_COMPLETE", "UPDATE_COMPLETE", "UPDATE_IN_PROGRESS").asJavaCollection) val results = getNextPage(baseRq) logger.debug(s"Got final results $results") (region, Success(results)) } catch { case err:Throwable=> logger.error(s"Could not list stacks from $region", err) (region, Failure(err)) } } /** * endpoint that returns the known valid regions * @return */ def getRegions = IsAuthenticated { _=> _=> Ok(ObjectListResponse("ok","regions",Regions.values().map(_.toString.replace("_","-").toLowerCase),Regions.values().length).asJson) } def convertJavaDate(date:Date) = ZonedDateTime.ofInstant(Instant.ofEpochMilli(date.getTime),ZoneId.of("UTC")) /** * scans for anything that looks like a deployment of the Proxy Framework in the running AWS account * @return */ def lookupPotentialDeployments = IsAdminAsync { _=> request=> val lookupFutures = Regions.values().map(rgn=>scanRegionForDeployments(rgn.getName)).toSeq val resultsFuture = Future.sequence(lookupFutures).map(resultList=>{ resultList.map(resultTuple=>{ resultTuple._2 match { case Success(summarySeq)=> Right(summarySeq.map(summ=> ProxyFrameworkDeploymentInfo( resultTuple._1, summ.getStackId,summ.getStackName,summ.getStackStatus, summ.getTemplateDescription,convertJavaDate(summ.getCreationTime)) )) case Failure(err)=> Left((resultTuple._1, err.toString)) } }) }) resultsFuture.map(results=>{ val successes = results.collect({ case Right(info) => info }) val failures = results.collect({ case Left(err) => err }) val statusString = if(successes.isEmpty){ "failure" } else if(failures.isEmpty){ "success" } else { "partial" } Ok(MultiResultResponse(statusString, "proxyFrameworkDeploymentInfo", successes, failures).asJson) }) } def addDeployment = IsAdminAsync(circe.json(2048)) { _=> request=> request.body.as[AddPFDeploymentRequest].fold( err=>Future(BadRequest(GenericErrorResponse("bad_request",err.toString).asJson)), clientRequest=> { val client = getCfClient(clientRequest.region) val rq = new DescribeStacksRequest().withStackName(clientRequest.stackName) val result = client.describeStacks(rq) val stacks = result.getStacks.asScala if(stacks.isEmpty){ Future(NotFound(GenericErrorResponse("not_found","No stack by that name in that region").asJson)) } else if(stacks.length>1){ Future(BadRequest(GenericErrorResponse("too_many","Multiple stacks found by that name").asJson)) } else { ProxyFrameworkInstance.fromStackSummary(clientRequest.region, stacks.head) match { case Some(rec) => //setupDeployment saves the created record to the DB and performs subscriptions/security policy updates proxyFrameworkHelper.setupDeployment(rec).map({ case Success(results) => Ok(GenericErrorResponse("ok", "Record saved").asJson) case Failure(err) => InternalServerError(GenericErrorResponse("db_error", err.toString).asJson) }) case None => Future(BadRequest(GenericErrorResponse("invalid_request", "Stack did not have the right outputs defined").asJson)) } } }.recover({ case err:Throwable=> logger.error("Could not add new deployment from CF stack: ", err) InternalServerError(GenericErrorResponse("error", err.toString).asJson) }) ) } def addDeploymentDirect = IsAdminAsync(circe.json(2048)) { _=> request=> request.body.as[ProxyFrameworkInstance].fold( err=>Future(BadRequest(GenericErrorResponse("bad_request", err.toString).asJson)), rec=>proxyFrameworkInstanceDAO.recordsForRegion(rec.region).flatMap(results=>{ if(results.nonEmpty){ Future(Conflict(GenericErrorResponse("already_exists", "A record already exists for this region").asJson)) } else { proxyFrameworkInstanceDAO .put(rec) .map(_=> Ok(GenericErrorResponse("ok","Record saved").asJson) ).recover({ case err:Throwable=> logger.error(s"Could not update proxy framework deployment: ${err.getMessage}", err) InternalServerError(GenericErrorResponse("db_error",err.getMessage).asJson) }) } }) ) } def removeDeployment(forRegion:String) = IsAdminAsync { _=> request=> proxyFrameworkInstanceDAO.recordsForRegion(forRegion).flatMap(results=>{ val failures = results.collect({case Left(err)=>err}) if(failures.nonEmpty){ logger.error(s"Could not locate deployment to remove: $failures") Future(InternalServerError(GenericErrorResponse("error", failures.head.toString).asJson)) } else { val frameworks = results.collect({case Right(pt)=>pt}) if(frameworks.isEmpty){ logger.error(s"Could not find any frameworks for region $forRegion") Future(NotFound(GenericErrorResponse("error", s"Could not find any frameworks for region $forRegion").asJson)) } else { logger.info(s"Attempting to detach framework instance ${frameworks.head}...") proxyFrameworkHelper.detachFramework(frameworks.head).flatMap({ case Success(_)=> logger.info(s"Detach succeeded. Deleting reference...") proxyFrameworkInstanceDAO.remove(forRegion) .map(result=> { logger.info("Reference deleted from database; remove deployment complete") Ok(GenericErrorResponse("ok", "Record deleted").asJson) }) .recoverWith({ case err:Throwable=> logger.error("Could not remove reference from database: ", err) Future(InternalServerError(GenericErrorResponse("db_error",err.toString).asJson)) }) case Failure(err)=> logger.error(s"Could not detech framework instance ${frameworks.head}: ${err.getMessage}", err) Future(InternalServerError(GenericErrorResponse("error", err.toString).asJson)) }) } } }) } }