in src/main/scala/com/gu/kinesis/KinesisSource.scala [63:95]
private[kinesis] def apply(
workerFactory: (ShardRecordProcessorFactory, ConsumerConfig) => ManagedWorker,
kclConfig: ConsumerConfig,
shardCheckpointConfig: ShardCheckpointConfig,
consumerStats: ConsumerStats
): Source[KinesisRecord, Future[Done]] = {
val kinesisAppId = KinesisAppId(kclConfig.streamName, kclConfig.appName)
MergeHub
.source[KinesisRecord](perProducerBufferSize = 1)
.viaMat(KillSwitches.single)(Keep.both)
.watchTermination()(Keep.both)
.mergeMat(MaterializerAsValue.source)(Keep.both)
.mapMaterializedValue {
case (((mergeSink, streamKillSwitch), streamTerminationFuture), materializerFuture) =>
materializerFuture.flatMap { implicit materializer =>
val processorFactory = new RecordProcessorFactoryImpl(
kinesisAppId,
streamKillSwitch,
streamTerminationFuture,
mergeSink,
shardCheckpointConfig,
consumerStats
)
createAndStartKclWorker(
workerFactory,
processorFactory,
kclConfig,
streamKillSwitch,
streamTerminationFuture
)
}(scala.concurrent.ExecutionContext.global)
}
}