private def createAndStartKclWorker()

in src/main/scala/com/gu/kinesis/KinesisSource.scala [142:165]


  private def createAndStartKclWorker(
      workerFactory: (ShardRecordProcessorFactory, ConsumerConfig) => ManagedWorker,
      recordProcessorFactory: ShardRecordProcessorFactory,
      kclConfig: ConsumerConfig,
      streamKillSwitch: KillSwitch,
      streamTerminationFuture: Future[Done]
  ): Future[Done] = {
    implicit val blockingContext: ExecutionContext = BlockingContext.KinesisWorkersSharedContext
    val workerShutdownPromise = Promise[Done]()
    Future {
      try {
        val worker = Try(workerFactory(recordProcessorFactory, kclConfig))
        streamTerminationFuture.onComplete { _ =>
          val workerShutdownFuture = Future(worker.get.shutdownAndWait()).map(_ => Done)
          workerShutdownPromise.completeWith(workerShutdownFuture)
        }
        worker.get.run() // This call hijacks the thread.
      } catch {
        case NonFatal(e) => streamKillSwitch.abort(e)
      }
      streamKillSwitch.abort(new IllegalStateException("Worker shutdown unexpectedly."))
    }
    workerShutdownPromise.future
  }