in src/main/scala/com/gu/kinesis/KinesisSource.scala [97:140]
private[kinesis] def createKclWorker(
recordProcessorFactory: ShardRecordProcessorFactory,
config: ConsumerConfig
): ManagedWorker = {
val configsBuilder =
new ConfigsBuilder(
config.streamName,
config.appName,
config.kinesisClient,
config.dynamoClient,
config.cloudwatchClient,
config.workerId,
recordProcessorFactory
)
val checkpointConfig = configsBuilder.checkpointConfig()
val coordinatorConfig = config.coordinatorConfig.getOrElse(configsBuilder.coordinatorConfig())
.clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X)
val leaseManagementConfig = config.leaseManagementConfig.getOrElse(
configsBuilder
.leaseManagementConfig()
.billingMode(BillingMode.PAY_PER_REQUEST)
)
val lifecycleConfig = configsBuilder.lifecycleConfig()
val metricsConfig = config.metricsConfig.getOrElse(configsBuilder.metricsConfig())
val processorConfig = configsBuilder.processorConfig().callProcessRecordsEvenForEmptyRecordList(true)
val retrievalConfig = config.retrievalConfig.getOrElse {
new RetrievalConfig(config.kinesisClient, config.streamName, config.appName)
.retrievalSpecificConfig(new PollingConfig(config.streamName, config.kinesisClient))
.initialPositionInStreamExtended(config.initialPositionInStreamExtended)
}
new ManagedKinesisWorker(
new Scheduler(
checkpointConfig,
coordinatorConfig,
leaseManagementConfig,
lifecycleConfig,
metricsConfig,
processorConfig,
retrievalConfig
)
)
}