def pollIngestStore()

in backend/app/ingestion/phase2/IngestStorePolling.scala [61:108]


  def pollIngestStore(): Unit = {
    try {
      val pollCompleteFuture: Future[FiniteDuration] = getNextBatch.fold(
        failure => {
          logger.warn(s"Failed to poll ingestion store $failure")
          metricsService.updateMetric(Metrics.batchesFailed)
          maximumWait
        },
        batch => {
          if(batch.isEmpty) {
            maximumWait
          } else {
            val results = batch.map { key =>
              logger.info(s"Processing $key")

              val result = processKey(key)
              result match {
                case Left(failure) =>
                  metricsService.updateMetric(Metrics.itemsFailed)
                  logger.warn(s"Failed to process $key: $failure. File will be moved to dead letter bucket. To re-ingest the file, " +
                    s"either re-upload it or use the /api/ingestion/retry-dead-letter-files endpoint to re-ingest all dead letter files.")
                  ingestStorage.sendToDeadLetterBucket(key)
                case _ => ingestStorage.delete(key)
              }
              result
            }.collect { case Right(success) => success }
            metricsService.updateMetrics(List(
              MetricUpdate(Metrics.itemsIngested, results.size),
              MetricUpdate(Metrics.batchesIngested, 1)))
            logger.info(s"Processed ${results.size}. Checking for work again in $minimumWait")
            minimumWait
          }
        }
      )
      pollCompleteFuture.onComplete {
        case Success(pollDuration) => schedulePoll(pollDuration)
        case SFailure(t) =>
          logger.error("Exception whilst processing ingestion batch", t)
          metricsService.updateMetric(Metrics.batchesFailed)
          schedulePoll(maximumWait)
      }
    } catch {
      case NonFatal(t) =>
        logger.error("Exception whilst getting next batch from ingestion store", t)
        metricsService.updateMetric(Metrics.batchesFailed)
        schedulePoll(maximumWait)
    }
  }