in backend/app/ingestion/phase2/IngestStorePolling.scala [61:108]
def pollIngestStore(): Unit = {
try {
val pollCompleteFuture: Future[FiniteDuration] = getNextBatch.fold(
failure => {
logger.warn(s"Failed to poll ingestion store $failure")
metricsService.updateMetric(Metrics.batchesFailed)
maximumWait
},
batch => {
if(batch.isEmpty) {
maximumWait
} else {
val results = batch.map { key =>
logger.info(s"Processing $key")
val result = processKey(key)
result match {
case Left(failure) =>
metricsService.updateMetric(Metrics.itemsFailed)
logger.warn(s"Failed to process $key: $failure. File will be moved to dead letter bucket. To re-ingest the file, " +
s"either re-upload it or use the /api/ingestion/retry-dead-letter-files endpoint to re-ingest all dead letter files.")
ingestStorage.sendToDeadLetterBucket(key)
case _ => ingestStorage.delete(key)
}
result
}.collect { case Right(success) => success }
metricsService.updateMetrics(List(
MetricUpdate(Metrics.itemsIngested, results.size),
MetricUpdate(Metrics.batchesIngested, 1)))
logger.info(s"Processed ${results.size}. Checking for work again in $minimumWait")
minimumWait
}
}
)
pollCompleteFuture.onComplete {
case Success(pollDuration) => schedulePoll(pollDuration)
case SFailure(t) =>
logger.error("Exception whilst processing ingestion batch", t)
metricsService.updateMetric(Metrics.batchesFailed)
schedulePoll(maximumWait)
}
} catch {
case NonFatal(t) =>
logger.error("Exception whilst getting next batch from ingestion store", t)
metricsService.updateMetric(Metrics.batchesFailed)
schedulePoll(maximumWait)
}
}