private def blockToEnqueueAndHandleResult()

in src/main/scala/com/gu/kinesis/RecordProcessorImpl.scala [185:209]


  private def blockToEnqueueAndHandleResult(kinesisRecords: IndexedSeq[KinesisRecord]): Unit = {
    try {
      kinesisRecords.foreach(consumerStats.trackRecord(shardConsumerId, _))
      val enqueueFuture = consumerStats.trackBatchEnqueue(shardConsumerId, kinesisRecords.size) {
        queue.offer(kinesisRecords)
      }
      Await.result(enqueueFuture, Duration.Inf) match {
        case QueueOfferResult.Enqueued =>
        // Do nothing.

        case QueueOfferResult.QueueClosed =>
        // Do nothing.

        case QueueOfferResult.Dropped =>
          streamKillSwitch.abort(
            new AssertionError("RecordProcessor source queue must use `OverflowStrategy.Backpressure`.")
          )

        case QueueOfferResult.Failure(e) =>
          streamKillSwitch.abort(e)
      }
    } catch {
      case NonFatal(e) => streamKillSwitch.abort(e)
    }
  }