app/services/FaciaPress.scala (115 lines of code) (raw):
package services
import com.amazonaws.regions.Regions
import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder
import com.amazonaws.services.sns.model.PublishResult
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import com.amazonaws.services.sqs.model.SendMessageResult
import com.gu.facia.api.models.faciapress.{
Draft,
FrontPath,
Live,
PressJob,
PressType
}
import conf.ApplicationConfiguration
import logging.Logging
import metrics.FaciaToolMetrics.{EnqueuePressFailure, EnqueuePressSuccess}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
import play.api.libs.json.Json
import play.api.libs.json.JsObject
case class PressCommand(
collectionIds: Set[String],
live: Boolean = false,
draft: Boolean = false,
forceConfigUpdate: Option[Boolean] = Option(false)
) {
def withPressLive(b: Boolean = true): PressCommand = this.copy(live = b)
def withPressDraft(b: Boolean = true): PressCommand = this.copy(draft = b)
def withForceConfigUpdate(b: Boolean = true): PressCommand =
this.copy(forceConfigUpdate = Option(b))
}
object PressCommand {
def forOneId(id: String): PressCommand = PressCommand(Set(id))
}
class FaciaPressTopic(val config: ApplicationConfiguration) {
val maybeTopic = config.faciatool.frontPressToolTopic map { topicArn =>
val credentials = config.aws.cmsFrontsAccountCredentials
JsonMessageTopic[PressJob](
AmazonSNSAsyncClientBuilder
.standard()
.withCredentials(credentials)
.withRegion(Regions.EU_WEST_1)
.build(),
topicArn
)
}
def publish(
job: PressJob,
collectionIds: Set[String] = Set()
): Future[PublishResult] = {
maybeTopic match {
case Some(topic) if collectionIds.nonEmpty =>
import SNSTopics._
val event = Json.toJson(job).as[JsObject] ++ JsObject(
Map("collectionIds" -> Json.toJson(collectionIds))
)
topic.client.publishMessageFuture(topic.topicArn, Json.stringify(event))
case Some(topic) =>
topic.send(job)
case None =>
Future.failed(new RuntimeException("Could not publish job."))
}
}
}
class FaciaPress(
val faciaPressTopic: FaciaPressTopic,
val configAgent: ConfigAgent
) extends Logging {
def press(pressCommand: PressCommand): Future[List[PublishResult]] = {
configAgent.refreshAndReturn() flatMap { _ =>
val paths: Set[String] = for {
id <- pressCommand.collectionIds
path <- configAgent.getConfigsUsingCollectionId(id)
} yield path
val pathToCollectionIdsLookup = configAgent.getConfigCollectionMap
def sendEvents(pressType: PressType) = Future.traverse(
paths.filter(_ =>
pressType match {
case Live => pressCommand.live
case Draft => pressCommand.draft
}
)
) { path =>
val event = PressJob(
FrontPath(path),
pressType,
forceConfigUpdate = pressCommand.forceConfigUpdate
)
val collectionIdsRelevantToPath =
pressCommand.collectionIds.filter(collectionId => {
pathToCollectionIdsLookup.get(path).exists(_.contains(collectionId))
})
val publishResultFuture =
faciaPressTopic.publish(event, collectionIdsRelevantToPath)
publishResultFuture.onComplete {
case Failure(error) =>
logger.error(
s"Error publishing to the SNS topic, $pressType event: $event",
error
)
EnqueuePressFailure.increment()
case Success(_) =>
logger.info(s"Published to the SNS topic, $pressType event: $event")
EnqueuePressSuccess.increment()
}
publishResultFuture
}
for {
live <- sendEvents(Live)
draft <- sendEvents(Draft)
} yield (live ++ draft).toList
}
}
}