in src/main/scala/com/gu/kinesis/KinesisSource.scala [142:165]
private def createAndStartKclWorker(
workerFactory: (ShardRecordProcessorFactory, ConsumerConfig) => ManagedWorker,
recordProcessorFactory: ShardRecordProcessorFactory,
kclConfig: ConsumerConfig,
streamKillSwitch: KillSwitch,
streamTerminationFuture: Future[Done]
): Future[Done] = {
implicit val blockingContext: ExecutionContext = BlockingContext.KinesisWorkersSharedContext
val workerShutdownPromise = Promise[Done]()
Future {
try {
val worker = Try(workerFactory(recordProcessorFactory, kclConfig))
streamTerminationFuture.onComplete { _ =>
val workerShutdownFuture = Future(worker.get.shutdownAndWait()).map(_ => Done)
workerShutdownPromise.completeWith(workerShutdownFuture)
}
worker.get.run() // This call hijacks the thread.
} catch {
case NonFatal(e) => streamKillSwitch.abort(e)
}
streamKillSwitch.abort(new IllegalStateException("Worker shutdown unexpectedly."))
}
workerShutdownPromise.future
}