override def shardRecordProcessor()

in src/main/scala/com/gu/kinesis/KinesisSource.scala [189:204]


  override def shardRecordProcessor(): ShardRecordProcessor = {
    val queue = Source
      .queue[IndexedSeq[KinesisRecord]](bufferSize = 0, OverflowStrategy.backpressure)
      .mapConcat(_.toIndexedSeq)
      .to(mergeSink)
      .run()

    new RecordProcessorImpl(
      kinesisAppId,
      streamKillSwitch,
      streamTerminationFuture,
      queue,
      shardCheckpointConfig,
      consumerStats
    )
  }