thrall/app/lib/ThrallConfig.scala (69 lines of code) (raw):
package lib
import com.gu.mediaservice.lib.aws.AwsClientV2BuilderUtils
import com.gu.mediaservice.lib.cleanup.ReapableEligibiltyResources
import com.gu.mediaservice.lib.config.{CommonConfigWithElastic, GridConfigResources, ReapableEligibilityLoader}
import com.gu.mediaservice.lib.elasticsearch.ReapableEligibility
import org.joda.time.DateTime
import org.joda.time.format.ISODateTimeFormat
import play.api.inject.ApplicationLifecycle
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.http.Protocol
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.{CloudWatchAsyncClient, CloudWatchAsyncClientBuilder}
import software.amazon.awssdk.services.dynamodb.{DynamoDbAsyncClient, DynamoDbAsyncClientBuilder}
import software.amazon.awssdk.services.kinesis.{KinesisAsyncClient, KinesisAsyncClientBuilder}
import software.amazon.kinesis.metrics.MetricsLevel
import java.net.URI
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.language.postfixOps
case class KinesisReceiverConfig(
override val awsRegionV2: Region,
override val awsCredentialsV2: AwsCredentialsProvider,
override val awsLocalEndpointUri: Option[URI],
override val isDev: Boolean,
streamName: String,
rewindFrom: Option[DateTime],
metricsLevel: MetricsLevel = MetricsLevel.DETAILED
) extends AwsClientV2BuilderUtils {
lazy val kinesisClient: KinesisAsyncClient = {
val clientBuilder = withAWSCredentialsV2(KinesisAsyncClient.builder())
if (isDev) {
clientBuilder.httpClientBuilder(NettyNioAsyncHttpClient
.builder()
.protocol(Protocol.HTTP1_1))
}
clientBuilder.build()
}
lazy val dynamoClient: DynamoDbAsyncClient = withAWSCredentialsV2(DynamoDbAsyncClient.builder()).build()
lazy val cloudwatchClient: CloudWatchAsyncClient = withAWSCredentialsV2(CloudWatchAsyncClient.builder()).build()
}
object KinesisReceiverConfig {
def apply(streamName: String, rewindFrom: Option[DateTime], thrallConfig: ThrallConfig): KinesisReceiverConfig = KinesisReceiverConfig(
thrallConfig.awsRegionV2,
thrallConfig.awsCredentialsV2,
thrallConfig.awsLocalEndpointUri,
thrallConfig.isDev,
streamName,
rewindFrom
)
}
class ThrallConfig(resources: GridConfigResources) extends CommonConfigWithElastic(resources) {
val imageBucket: String = string("s3.image.bucket")
val thumbnailBucket: String = string("s3.thumb.bucket")
val maybeReaperBucket: Option[String] = stringOpt("s3.reaper.bucket")
val maybeReaperCountPerRun: Option[Int] = intOpt("reaper.countPerRun")
val metadataTopicArn: String = string("indexed.image.sns.topic.arn")
val rewindFrom: Option[DateTime] = stringOpt("thrall.kinesis.stream.rewindFrom").map(ISODateTimeFormat.dateTime.parseDateTime)
val lowPriorityRewindFrom: Option[DateTime] = stringOpt("thrall.kinesis.lowPriorityStream.rewindFrom").map(ISODateTimeFormat.dateTime.parseDateTime)
val isVersionedS3: Boolean = boolean("s3.image.versioned")
val projectionParallelism: Int = intDefault("thrall.projection.parallelism", 1)
val reaperInterval: FiniteDuration = intDefault("reaper.interval", 15) minutes
val hardReapImagesAge: Int = intDefault("reaper.hard.daysInSoftDelete", 14) // soft deleted images age to be hard deleted by Reaper Controller
def kinesisConfig: KinesisReceiverConfig = KinesisReceiverConfig(thrallKinesisStream, rewindFrom, this)
def kinesisLowPriorityConfig: KinesisReceiverConfig = KinesisReceiverConfig(thrallKinesisLowPriorityStream, lowPriorityRewindFrom, this)
def maybeReapableEligibilityClass(applicationLifecycle: ApplicationLifecycle): Option[ReapableEligibility] = {
val configLoader = ReapableEligibilityLoader.singletonConfigLoader(ReapableEligibiltyResources(this, resources.actorSystem), applicationLifecycle)
configuration.getOptional[ReapableEligibility]("reaper.provider")(configLoader)
}
}