in src/it/scala/com/gu/kinesis/TestStreamConfig.scala [33:57]
def kclConfig(workerId: String): ConsumerConfig = {
val kinesisClient =
KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(Region.of(regionName)))
val initialPositionInStream =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
ConsumerConfig(
streamName,
applicationName,
workerId,
initialPositionInStream,
coordinatorConfig = Some(
new CoordinatorConfig(applicationName)
.shardConsumerDispatchPollIntervalMillis(idleTimeBetweenGetRecords.toMillis)
),
retrievalConfig = Some(
new RetrievalConfig(kinesisClient, streamName, applicationName)
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
.initialPositionInStreamExtended(initialPositionInStream)
)
)(
kinesisClient,
DynamoDbAsyncClient.builder.region(Region.of(regionName)).build(),
CloudWatchAsyncClient.builder.region(Region.of(regionName)).build()
)
}