def kclConfig()

in src/it/scala/com/gu/kinesis/TestStreamConfig.scala [33:57]


  def kclConfig(workerId: String): ConsumerConfig = {
    val kinesisClient =
      KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(Region.of(regionName)))
    val initialPositionInStream =
      InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
    ConsumerConfig(
      streamName,
      applicationName,
      workerId,
      initialPositionInStream,
      coordinatorConfig = Some(
        new CoordinatorConfig(applicationName)
          .shardConsumerDispatchPollIntervalMillis(idleTimeBetweenGetRecords.toMillis)
      ),
      retrievalConfig = Some(
        new RetrievalConfig(kinesisClient, streamName, applicationName)
          .retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
          .initialPositionInStreamExtended(initialPositionInStream)
      )
    )(
      kinesisClient,
      DynamoDbAsyncClient.builder.region(Region.of(regionName)).build(),
      CloudWatchAsyncClient.builder.region(Region.of(regionName)).build()
    )
  }