private def shutdown()

in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [160:183]


  private def shutdown(shutdownInput: ShutdownInput): Unit = {
    val shutdownReason = shutdownInput.shutdownReason()

    shutdownReason match {
      case ShutdownReason.LEASE_LOST =>
      // Do nothing.

      case ShutdownReason.SHARD_END =>
        waitForInFlightRecordsOrTermination()
        checkpointAndHandleErrors(shutdownInput.checkpointer(), shardEnd = true)

      case ShutdownReason.REQUESTED =>
        /* The shutdown can be requested due to a stream failure or downstream cancellation. In either of these cases,
         * some records will never be processed, so we should not block waiting for all the records to complete.
         * Additionally, when we know the stream failed, and is being torn down, it's pointless to wait for in-flight
         * records to complete. */
        waitForInFlightRecordsUnlessStreamFailed(shardCheckpointConfig.maxWaitForCompletionOnStreamShutdown)
        checkpointAndHandleErrors(shutdownInput.checkpointer())
    }

    queue.complete()
    consumerStats.reportShutdown(shardConsumerId, shutdownReason)
    log.info(s"Finished shutting down $shardConsumerId, reason: $shutdownReason.")
  }