app/notification/SNS.scala (87 lines of code) (raw):

package notification import com.amazonaws.services.sns.AmazonSNS import com.amazonaws.services.sns.model._ import services.Loggable import scala.annotation.tailrec import scala.jdk.CollectionConverters._ import scala.concurrent.ExecutionContext import scala.language.postfixOps object SNS extends Loggable { def listAwsResource[T]( request: Option[String] => (List[T], Option[String]) ): List[T] = { @tailrec def listAwsResourceRec( soFar: List[T], nextToken: Option[String] ): List[T] = { nextToken match { case None => soFar case Some(next) => val (more, nextNext) = request(Some(next)) listAwsResourceRec(soFar ::: more, nextNext) } } val (initialResults, initialNext) = request(None) listAwsResourceRec(initialResults, initialNext) } @tailrec private def waitForTopicToBecomeAvailable( arn: String )(implicit client: AmazonSNS): Unit = { if (!listTopicArns.exists(arn ==)) { log.info(s"Waiting for topic $arn to become available ...") Thread.sleep(500L) waitForTopicToBecomeAvailable(arn) } } def listTopicArns(implicit client: AmazonSNS): List[String] = SNS .listAwsResource[Topic] { nextToken => val result = client.listTopics( new ListTopicsRequest().withNextToken(nextToken.orNull) ) result.getTopics.asScala.toList -> Option(result.getNextToken) } .map(_.getTopicArn) def createTopic(topicName: String)(implicit client: AmazonSNS): String = { val result = client.createTopic(new CreateTopicRequest().withName(topicName)) val topicArn = result.getTopicArn SNS.waitForTopicToBecomeAvailable(topicArn) topicArn } def updatePermissions(topicArn: String, accounts: Seq[String])(implicit client: AmazonSNS ): Unit = { val removeRequest = new RemovePermissionRequest() .withTopicArn(topicArn) .withLabel("amigo_lambda_subs") client.removePermission(removeRequest) val addRequest = new AddPermissionRequest() .withTopicArn(topicArn) .withAWSAccountIds(accounts.asJava) .withActionNames("Subscribe", "ListSubscriptionsByTopic", "Receive") .withLabel("amigo_lambda_subs") client.addPermission(addRequest) } def findOrCreateTopic(topicName: String, accountNumbers: Seq[String])(implicit client: AmazonSNS ): String = { val topicArn = listTopicArns.find(_.endsWith(s":$topicName")) match { case None => createTopic(topicName) case Some(arn) => arn } updatePermissions(topicArn, accountNumbers) topicArn } } class SNS(sns: AmazonSNS, stage: String, accountNumbers: Seq[String])(implicit exec: ExecutionContext ) { implicit val client: AmazonSNS = sns val topicName: String = s"amigo-$stage-notify" val topicArn: String = SNS.findOrCreateTopic(topicName, accountNumbers) val housekeepingTopicName: String = s"amigo-$stage-housekeeping-notify" val housekeepingTopicArn: String = SNS.findOrCreateTopic(housekeepingTopicName, accountNumbers) }