private[kinesis] def createKclWorker()

in src/main/scala/com/gu/kinesis/KinesisSource.scala [97:140]


  private[kinesis] def createKclWorker(
      recordProcessorFactory: ShardRecordProcessorFactory,
      config: ConsumerConfig
  ): ManagedWorker = {
    val configsBuilder =
      new ConfigsBuilder(
        config.streamName,
        config.appName,
        config.kinesisClient,
        config.dynamoClient,
        config.cloudwatchClient,
        config.workerId,
        recordProcessorFactory
      )

    val checkpointConfig = configsBuilder.checkpointConfig()
    val coordinatorConfig = config.coordinatorConfig.getOrElse(configsBuilder.coordinatorConfig())
      .clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X)
    val leaseManagementConfig = config.leaseManagementConfig.getOrElse(
      configsBuilder
        .leaseManagementConfig()
        .billingMode(BillingMode.PAY_PER_REQUEST)
    )
    val lifecycleConfig = configsBuilder.lifecycleConfig()
    val metricsConfig = config.metricsConfig.getOrElse(configsBuilder.metricsConfig())
    val processorConfig = configsBuilder.processorConfig().callProcessRecordsEvenForEmptyRecordList(true)
    val retrievalConfig = config.retrievalConfig.getOrElse {
      new RetrievalConfig(config.kinesisClient, config.streamName, config.appName)
        .retrievalSpecificConfig(new PollingConfig(config.streamName, config.kinesisClient))
        .initialPositionInStreamExtended(config.initialPositionInStreamExtended)
    }

    new ManagedKinesisWorker(
      new Scheduler(
        checkpointConfig,
        coordinatorConfig,
        leaseManagementConfig,
        lifecycleConfig,
        metricsConfig,
        processorConfig,
        retrievalConfig
      )
    )
  }