private[kinesis] def apply()

in src/main/scala/com/gu/kinesis/KinesisSource.scala [63:95]


  private[kinesis] def apply(
      workerFactory: (ShardRecordProcessorFactory, ConsumerConfig) => ManagedWorker,
      kclConfig: ConsumerConfig,
      shardCheckpointConfig: ShardCheckpointConfig,
      consumerStats: ConsumerStats
  ): Source[KinesisRecord, Future[Done]] = {
    val kinesisAppId = KinesisAppId(kclConfig.streamName, kclConfig.appName)
    MergeHub
      .source[KinesisRecord](perProducerBufferSize = 1)
      .viaMat(KillSwitches.single)(Keep.both)
      .watchTermination()(Keep.both)
      .mergeMat(MaterializerAsValue.source)(Keep.both)
      .mapMaterializedValue {
        case (((mergeSink, streamKillSwitch), streamTerminationFuture), materializerFuture) =>
          materializerFuture.flatMap { implicit materializer =>
            val processorFactory = new RecordProcessorFactoryImpl(
              kinesisAppId,
              streamKillSwitch,
              streamTerminationFuture,
              mergeSink,
              shardCheckpointConfig,
              consumerStats
            )
            createAndStartKclWorker(
              workerFactory,
              processorFactory,
              kclConfig,
              streamKillSwitch,
              streamTerminationFuture
            )
          }(scala.concurrent.ExecutionContext.global)
      }
  }