app/helpers/ProxyFramework.scala (160 lines of code) (raw):
package helpers
import com.amazonaws.services.securitytoken.AWSSecurityTokenService
import com.amazonaws.services.sns.model.{SubscribeRequest, UnsubscribeRequest}
import com.amazonaws.services.sqs.model.SetQueueAttributesRequest
import com.theguardian.multimedia.archivehunter.common.clientManagers.{SNSClientManager, SQSClientManager, STSClientManager}
import com.theguardian.multimedia.archivehunter.common.cmn_models.{ProxyFrameworkInstance, ProxyFrameworkInstanceDAO}
import io.circe.Error
import javax.inject.Inject
import models._
import play.api.{Configuration, Logger}
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.JavaConverters._
import io.circe.syntax._
import io.circe.generic.auto._
import io.circe.Printer
/**
* injectable helper class that contains operations for managing ProxyFramework instances
* @param config
* @param sqsClientManager
*/
class ProxyFramework @Inject()(config:Configuration,
sqsClientManager: SQSClientManager,
stsClientManager:STSClientManager,
snsClientManager:SNSClientManager,
proxyFrameworkInstanceDAO:ProxyFrameworkInstanceDAO) extends AwsSqsPolicyDecoder {
private val logger = Logger(getClass)
private val awsProfile = config.getOptional[String]("externalData.awsProfile")
private val sqsClient = sqsClientManager.getClient(awsProfile)
private val mainAppQueueArn = config.get[String]("proxyFramework.notificationsQueueArn")
private val mainAppQueueUrl = config.get[String]("proxyFramework.notificationsQueue")
private val roleDuration = config.getOptional[Int]("proxyFramework.roleDuration").getOrElse(900) //900 is the minimum value
protected def getStsClient(region:String):AWSSecurityTokenService = stsClientManager.getClientForRegion(awsProfile, region)
/**
* try to "attach" the given ProxyFrameworkInstance. This entails subscribing the main app's transcode message queue
* onto the ProxyFrameworkInstance's output topic, and adding the topic to the queue's security policy.
* These operations are performed asynchronously in parallel. On failure of either, we attempt rollback of BOTH results.
* @param f ProxyFrameworkInstance to attach to
* @return a Future, containing a Try indicating success or failure. On success, a tuple of (subscriptionId, policyResult) strings.
*/
def attachFramework(f:ProxyFrameworkInstance) =
Future.sequence(Seq(makeSubscription(f), addToPolicy(f))).map(result=>Success((result.head, result(1)))).recoverWith({
case err:Throwable=>
logger.error(s"Could not attach to framework $f:", err)
Future.sequence(Seq(removeSubscription(f), removeFromPolicy(f)))
.map(results=>Failure(err))
.recover({
case rollbackErr:Throwable=>
logger.error(s"Could not fully roll back framework attach: ", rollbackErr)
Failure(err)
})
})
/**
* try to "detach" the give ProxyFrameworkInstance. This entains unsubscribing the main apps transcode message qyeye
* from the ProxyFrameworkInstance's output topic, and removing the topic from the queue's security policy.
* These operations are performed asynchronously in parallel. On failure, no rollback is attempted; it should be safe to
* re-try the operation.
* @param f ProxyFrameworkInstance to detach from
* @return a Future, containing a Try indicating success or failure. On success, a tuple of (boolean, policyResult) is returned;
* the boolean indicates whether there was anything to unsubscribe or not.
*/
def detachFramework(f:ProxyFrameworkInstance) =
Future.sequence(Seq(removeSubscription(f), removeFromPolicy(f)))
.map(result=>Success((result.head.asInstanceOf[Boolean], result(1).asInstanceOf[String])))
.recover({
case err:Throwable=>
logger.error(s"Could not detach framework $f", err)
Failure(err)
})
/**
* uses the provided admin role to subscribe the main app's message queue to the framework deployment's output topic
* @param f ProxyFrameworkInstance
* @return a Future containing the Subscription ID of the subscription that was made
*/
def makeSubscription(f:ProxyFrameworkInstance):Future[String] = Future {
logger.info(s"Performing subscription from ${f.outputTopicArn} to $mainAppQueueArn")
implicit val stsClient = getStsClient(f.region)
snsClientManager.getTemporaryClient(f.region, f.roleArn) match {
case Success(snsClient) =>
val rq = new SubscribeRequest()
.withTopicArn(f.outputTopicArn)
.withProtocol("sqs")
.withEndpoint(mainAppQueueArn)
val result = snsClient.subscribe(rq)
result.getSubscriptionArn
case Failure(err) =>
logger.error(s"Could not connect to provided role ${f.roleArn}", err)
throw err
}
}
/**
* gets the current policy on our queue.
* @return None if there is no existing policy. Left with an error if the policy Json failed to parse; Right with the policy
* represented as a AwsSqsPolicy object if it parses properly.
*/
def getCurrentPolicy:Future[Option[Either[Error,AwsSqsPolicy]]] = Future {
val result = sqsClient.getQueueAttributes(mainAppQueueUrl,List("Policy").asJava)
result.getAttributes.asScala.get("Policy").map(policyStr=>{
logger.debug(s"Policy string: $policyStr")
io.circe.parser.parse(policyStr).flatMap(_.as[AwsSqsPolicy])
})
}
/**
* adds the output topic of the provided ProxyFrameworkInstance to the queue policy of our queue
* @param f ProxyFrameworkInstance describing the instance we're attaching to
* @return a Future containing a String indicating success. The Future fails on error; use recover() or recoverWith() to catch this
*/
def addToPolicy(f:ProxyFrameworkInstance):Future[String] = {
logger.info(s"Adding ${f.outputTopicArn} to queue policy for $mainAppQueueArn")
val newPolicyFuture = getCurrentPolicy.map({
case None=> //no policy exists yet
AwsSqsPolicy.createNew(Seq(
AwsSqsPolicyStatement.forInputOutput(mainAppQueueArn,f.outputTopicArn)
))
case Some(Left(err))=>
logger.error(s"Could not parse existing queue policy for $mainAppQueueArn: ${err.toString}")
throw new RuntimeException(s"Could not parse existing queue policy for $mainAppQueueArn: ${err.toString}")
case Some(Right(policy))=>
policy.withNewStatement(AwsSqsPolicyStatement.forInputOutput(mainAppQueueArn,f.outputTopicArn))
})
newPolicyFuture.map(newPolicy=>{
val str = newPolicy.asJson.noSpaces
logger.debug(s"Updating policy of $mainAppQueueUrl to $str")
val rq = new SetQueueAttributesRequest().withQueueUrl(mainAppQueueUrl).withAttributes(Map("Policy"->str).asJava)
val result = sqsClient.setQueueAttributes(rq)
result.toString
})
}
/**
* removes the output topic of the provided ProxyFrameworkInstance from the queue policy of our queue
* @param f ProxyFrameworkInstance describing the instance we're detaching from
* @return a Future containing a String indicating success. The Future fails on error; use recover() or recoverWith to catch this.
*/
def removeFromPolicy(f:ProxyFrameworkInstance):Future[String] = {
logger.info(s"Removing $f from the policy of $mainAppQueueArn")
val newPolicyFuture = getCurrentPolicy.map({
case None=> //no policy exists yet, so nothing needs to be done
None
case Some(Left(err))=>
logger.error(s"Could not parse existing queue policy for $mainAppQueueArn: ${err.toString}")
throw new RuntimeException(s"Could not parse existing queue policy for $mainAppQueueArn: ${err.toString}")
case Some(Right(policy))=>
Some(policy.withoutStatement(AwsSqsPolicyStatement.forInputOutput(mainAppQueueArn, f.outputTopicArn)))
})
newPolicyFuture.map(newPolicy=>{
val str = newPolicy.asJson.noSpaces
logger.debug(s"Updating policy of $mainAppQueueUrl to $str")
val rq = new SetQueueAttributesRequest().withQueueUrl(mainAppQueueUrl).withAttributes(Map("Policy" -> str).asJava)
val result = sqsClient.setQueueAttributes(rq)
result.toString
})
}
/**
* uses the provided admin role to unsubscribe the main app's message queue from the framework deployment's output topic
* @param f
* @return
*/
def removeSubscription(f:ProxyFrameworkInstance):Future[Boolean] = Future {
f.subscriptionId match {
case None=>
logger.error(s"ProxyFramework $f does not have a subscription ID so I can't disconnect")
false
case Some(subId)=>
logger.info(s"Unsubscribing ID $subId for $f")
implicit val stsClient = stsClientManager.getClientForRegion(awsProfile, f.region)
snsClientManager.getTemporaryClient(f.region, f.roleArn) match {
case Success(snsClient) =>
val rq = new UnsubscribeRequest().withSubscriptionArn(subId)
val result = snsClient.unsubscribe(rq)
true
case Failure(err) =>
logger.error(s"Could not assume role ${f.roleArn} to remove subscription", err)
throw err
}
}
}
/**
* perform setup of a new stack, as called from the controller. This involves saving the record to database and then
* attaching the topic to the queue
* @param clientRequest request object from the client detailing the stack we are trying to connect
* @param stack CloudFormation's description of the actual stack
* @return None if the stack could not be found. Otherwise, a Future containing a Try
* with either a tuple of (subscription ID, policyResult) or an error describing what went wrong.
*/
def setupDeployment(rec:ProxyFrameworkInstance) = {
logger.debug(s"Got stack info: $rec")
proxyFrameworkInstanceDAO
.put(rec)
.flatMap(_=> {
logger.debug(s"Write succeeded, commencing setup...")
attachFramework(rec).flatMap({
case Success(results) =>
logger.debug(s"Subscription succeeded, updating record with subscription ID")
val updatedRec = rec.copy(subscriptionId = Some(results._1))
proxyFrameworkInstanceDAO
.put(updatedRec)
.map(_=>Success(results))
.recover({
case err:Throwable=>
logger.error(s"Can't write proxy framework instance to database: ${err.getMessage}", err)
Failure(err)
})
case Failure(err) =>
logger.error("Failed to attach framework to app instance")
Future(Failure(err))
})
}).recover({
case err:Throwable=>
logger.error(s"Can't set up proxy framework instance $rec: ${err.getMessage}", err)
Failure(err)
})
}
}