in src/main/scala/com/gu/kinesis/KinesisSource.scala [189:204]
override def shardRecordProcessor(): ShardRecordProcessor = {
val queue = Source
.queue[IndexedSeq[KinesisRecord]](bufferSize = 0, OverflowStrategy.backpressure)
.mapConcat(_.toIndexedSeq)
.to(mergeSink)
.run()
new RecordProcessorImpl(
kinesisAppId,
streamKillSwitch,
streamTerminationFuture,
queue,
shardCheckpointConfig,
consumerStats
)
}