app/services/AWS.scala (114 lines of code) (raw):

package services import java.nio.ByteBuffer import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider import com.amazonaws.auth.profile.ProfileCredentialsProvider import com.amazonaws.regions.{Region, Regions} import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder import com.amazonaws.services.dynamodbv2.document.DynamoDB import com.amazonaws.services.ec2.AmazonEC2ClientBuilder import com.amazonaws.services.ec2.model.{DescribeTagsRequest, Filter} import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3ClientBuilder} import com.amazonaws.services.sqs.AmazonSQSClientBuilder import com.amazonaws.util.EC2MetadataUtils import com.twitter.scrooge.ThriftStruct import play.api.Logging import services.AWS.region import scala.jdk.CollectionConverters._ object AWS { lazy val region = Region getRegion Regions.EU_WEST_1 lazy val EC2Client = AmazonEC2ClientBuilder .standard() .withRegion(region.getName) .build() lazy val CloudWatch = AmazonCloudWatchAsyncClientBuilder .standard() .withRegion(region.getName) .build() lazy val Kinesis = AmazonKinesisClientBuilder .standard() .withRegion(region.getName) .build() lazy val S3Client = AmazonS3ClientBuilder .standard() .withRegion(region.getName) .build() private lazy val frontendCredentialsProvider = Config().frontendBucketWriteRole.map( new STSAssumeRoleSessionCredentialsProvider.Builder(_, "tagManager").build() ) lazy val frontendStaticFilesS3Client = AmazonS3ClientBuilder .standard() .withCredentials(frontendCredentialsProvider.getOrElse(new ProfileCredentialsProvider("frontend"))) .withRegion(region.getName) .build() } trait AwsInstanceTags { lazy val instanceId = Option(EC2MetadataUtils.getInstanceId) def readTag(tagName: String) = { instanceId.flatMap { id => val tagsResult = AWS.EC2Client.describeTags( new DescribeTagsRequest().withFilters( new Filter("resource-type").withValues("instance"), new Filter("resource-id").withValues(id), new Filter("key").withValues(tagName) ) ) tagsResult.getTags.asScala.find(_.getKey == tagName).map(_.getValue) } } } object Dynamo { lazy val client = AmazonDynamoDBClientBuilder .standard() .withRegion(AWS.region.getName) .build() lazy val dynamoDb = new DynamoDB(client) lazy val tagTable = dynamoDb.getTable(Config().tagsTableName) lazy val sectionTable = dynamoDb.getTable(Config().sectionsTableName) lazy val sponsorshipTable = dynamoDb.getTable(Config().sponsorshipTableName) lazy val sequenceTable = dynamoDb.getTable(Config().sequenceTableName) lazy val jobTable = dynamoDb.getTable(Config().jobTableName) lazy val tagAuditTable = dynamoDb.getTable(Config().tagAuditTableName) lazy val sectionAuditTable = dynamoDb.getTable(Config().sectionAuditTableName) lazy val appAuditTable = dynamoDb.getTable(Config().appAuditTableName) lazy val pillarTable = dynamoDb.getTable(Config().pillarsTableName) lazy val pillarAuditTable = dynamoDb.getTable(Config().pillarsAuditTableName) lazy val reindexProgressTable = dynamoDb.getTable(Config().reindexProgressTableName) lazy val clusterStatusTable = dynamoDb.getTable(Config().clusterStatusTableName) lazy val referencesTypeTable = dynamoDb.getTable(Config().referencesTypeTableName) } object SQS { lazy val SQSClient = AmazonSQSClientBuilder .standard() .withRegion(region.getName) .build() lazy val jobQueue = new SQSQueue(Config().jobQueueName) } class KinesisStreamProducer(streamName: String, requireCompressionByte: Boolean = false) extends Logging { def publishUpdate(key: String, data: String): Unit = { publishUpdate(key, ByteBuffer.wrap(data.getBytes("UTF-8"))) } def publishUpdate(key: String, data: Array[Byte]): Unit = { publishUpdate(key, ByteBuffer.wrap(data)) } def publishUpdate(key: String, struct: ThriftStruct): Unit = { logger.info(s"Kinesis Producer publishUpdate for streamName: $streamName") val thriftKinesisEvent: Array[Byte] = ThriftSerializer.serializeToBytes(struct, requireCompressionByte) publishUpdate(key, ByteBuffer.wrap(thriftKinesisEvent)) } def publishUpdate(key: String, dataBuffer: ByteBuffer): Unit = { AWS.Kinesis.putRecord(streamName, dataBuffer, key) } } object KinesisStreams { lazy val tagUpdateStream = new KinesisStreamProducer(streamName = Config().tagUpdateStreamName, requireCompressionByte = true) lazy val sectionUpdateStream = new KinesisStreamProducer(streamName = Config().sectionUpdateStreamName, requireCompressionByte = true) lazy val reindexTagsStream = new KinesisStreamProducer(streamName = Config().reindexTagsStreamName, requireCompressionByte = true) lazy val reindexSectionsStream = new KinesisStreamProducer(streamName = Config().reindexSectionsStreamName, requireCompressionByte = true) lazy val taggingOperationsStream = new KinesisStreamProducer(streamName = Config().taggingOperationsStreamName) lazy val taggingOperationsReIndexStream = new KinesisStreamProducer(streamName = Config().taggingOperationsReIndexStreamName) lazy val commercialExpiryStream = new KinesisStreamProducer(streamName = Config().commercialExpiryStreamName) lazy val pillarUpdateStream = new KinesisStreamProducer(streamName = Config().pillarUpdateStreamName, requireCompressionByte = true) lazy val reindexPillarsStream = new KinesisStreamProducer(streamName = Config().reindexPillarsStreamName, requireCompressionByte = true) }