usage/app/lib/CrierStreamReader.scala (111 lines of code) (raw):
package lib
import com.gu.mediaservice.lib.logging.GridLogging
import model.UsageGroupOps
import software.amazon.awssdk.auth.credentials.{AwsCredentialsProviderChain, DefaultCredentialsProvider, ProfileCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.sts.StsClient
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest
import software.amazon.kinesis.common.{ConfigsBuilder, InitialPositionInStream, InitialPositionInStreamExtended, KinesisClientUtil}
import software.amazon.kinesis.coordinator.CoordinatorConfig.ClientVersionConfig
import software.amazon.kinesis.coordinator.Scheduler
import software.amazon.kinesis.processor.{ShardRecordProcessor, ShardRecordProcessorFactory}
import software.amazon.kinesis.retrieval.polling.PollingConfig
import java.net.InetAddress
import java.util.UUID
import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
// it's annoyingly hard to get the streamName out of the configsBuilder once built, so pass them around together
private case class ConfigsBuilderWithStreamName(configsBuilder: ConfigsBuilder, streamName: String)
class CrierStreamReader(
config: UsageConfig,
usageGroupOps: UsageGroupOps,
executionContext: ExecutionContext
) extends GridLogging {
private val region = Region.of(config.awsRegionName)
private lazy val workerId: String = InetAddress.getLocalHost.getCanonicalHostName + ":" + UUID.randomUUID()
private lazy val awsCredentialsProvider = DefaultCredentialsProvider.builder().profileName("media-service").build()
private lazy val stsClient = StsClient.builder().region(region).credentialsProvider(awsCredentialsProvider).build()
private lazy val sessionId: String = "session-" + Math.random()
private val initialPosition = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
private def kinesisCredentialsProvider(arn: String): AwsCredentialsProviderChain = {
val assumeRoleRequest = AssumeRoleRequest.builder().roleArn(arn).roleSessionName(sessionId).build()
AwsCredentialsProviderChain.of(
ProfileCredentialsProvider.create("capi"),
StsAssumeRoleCredentialsProvider.builder().refreshRequest(assumeRoleRequest).stsClient(stsClient).build()
)
}
private def kinesisClientLibConfig(processorFactory: ShardRecordProcessorFactory)
(kinesisReaderConfig: KinesisReaderConfig): ConfigsBuilderWithStreamName = {
val kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder()
.region(region)
.credentialsProvider(kinesisCredentialsProvider(kinesisReaderConfig.arn)))
val dynamoClient = DynamoDbAsyncClient.builder()
.region(region)
.credentialsProvider(awsCredentialsProvider)
.build()
val cloudwatchClient = CloudWatchAsyncClient.builder()
.region(region)
.credentialsProvider(awsCredentialsProvider)
.build()
ConfigsBuilderWithStreamName(
new ConfigsBuilder(
kinesisReaderConfig.streamName,
kinesisReaderConfig.appName,
kinesisClient,
dynamoClient,
cloudwatchClient,
workerId,
processorFactory
),
kinesisReaderConfig.streamName
)
}
@nowarn("cat=deprecation") // initialPositionInStreamExtended is deprecated, but the upgrade path is unclear
private def kinesisClientLibScheduler(configsBuilderAndStreamName: ConfigsBuilderWithStreamName): Scheduler = {
val ConfigsBuilderWithStreamName(configsBuilder, streamName) = configsBuilderAndStreamName
new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig()
.clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.initialPositionInStreamExtended(initialPosition)
.retrievalSpecificConfig(new PollingConfig(streamName, configsBuilder.kinesisClient())),
)
}
private val LiveEventProcessorFactory = new ShardRecordProcessorFactory {
override def shardRecordProcessor(): ShardRecordProcessor =
new CrierLiveEventProcessor(config, usageGroupOps)
}
private val PreviewEventProcessorFactory = new ShardRecordProcessorFactory {
override def shardRecordProcessor(): ShardRecordProcessor =
new CrierPreviewEventProcessor(config, usageGroupOps)
}
private lazy val liveConfig = config.liveKinesisReaderConfig
.map(kinesisClientLibConfig(LiveEventProcessorFactory))
private lazy val previewConfig = config.previewKinesisReaderConfig
.map(kinesisClientLibConfig(PreviewEventProcessorFactory))
private lazy val liveScheduler = liveConfig.map(kinesisClientLibScheduler)
private lazy val previewScheduler = previewConfig.map(kinesisClientLibScheduler)
def start(): Unit = {
logger.info("Trying to start Crier Stream Readers")
liveScheduler
.map(executionContext.execute)
.fold(
e => logger.error("No 'Crier Live Stream reader' thread to start", e),
_ => logger.info("Starting Crier Live Stream reader")
)
previewScheduler
.map(executionContext.execute)
.fold(
e => logger.error("No 'Crier Preview Stream reader' thread to start", e),
_ => logger.info("Starting Crier Preview Stream reader")
)
}
}