in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [160:183]
private def shutdown(shutdownInput: ShutdownInput): Unit = {
val shutdownReason = shutdownInput.shutdownReason()
shutdownReason match {
case ShutdownReason.LEASE_LOST =>
// Do nothing.
case ShutdownReason.SHARD_END =>
waitForInFlightRecordsOrTermination()
checkpointAndHandleErrors(shutdownInput.checkpointer(), shardEnd = true)
case ShutdownReason.REQUESTED =>
/* The shutdown can be requested due to a stream failure or downstream cancellation. In either of these cases,
* some records will never be processed, so we should not block waiting for all the records to complete.
* Additionally, when we know the stream failed, and is being torn down, it's pointless to wait for in-flight
* records to complete. */
waitForInFlightRecordsUnlessStreamFailed(shardCheckpointConfig.maxWaitForCompletionOnStreamShutdown)
checkpointAndHandleErrors(shutdownInput.checkpointer())
}
queue.complete()
consumerStats.reportShutdown(shardConsumerId, shutdownReason)
log.info(s"Finished shutting down $shardConsumerId, reason: $shutdownReason.")
}