in backend/app/extraction/Worker.scala [67:132]
def executeBatch(work: Batch): Int = {
if(work.nonEmpty) {
logger.info(s"Found work: ${work.size} assignments")
} else {
logger.info("No work found")
}
work.foldLeft(0) { case(completed, (extractor, blob, params)) =>
logger.info(s"Working on ${blob.uri.value} with ${extractor.name}")
val observabilityMetadata = EventMetadata(blob.uri.value, params.ingestion)
postgresClient.insertEvent(
IngestionEvent(
observabilityMetadata,
IngestionEventType.RunExtractor,
status = EventStatus.Started,
details = EventDetails.extractorDetails(extractor.name))
)
val result = blobStorage.get(blob.uri.toStoragePath)
.flatMap(safeInvokeExtractor(params, extractor, blob, _))
result match {
case Right(_) =>
if (extractor.external) {
markExternalAsProcessing(params, blob, extractor)
} else {
markAsComplete(params, blob, extractor)
postgresClient.insertEvent(
IngestionEvent(
observabilityMetadata,
IngestionEventType.RunExtractor,
status = EventStatus.Success,
details = EventDetails.extractorDetails(extractor.name))
)
}
completed + 1
case Left(SubprocessInterruptedFailure) =>
logger.info(s"Subprocess terminated while processing ${blob.uri.value}")
logger.info("We expect this to happen when a worker instance is terminated midway through a job")
logger.info("I am not marking it in an extraction failure to allow a new worker to pick up the work")
completed
case Left(failure) =>
markAsFailure(blob, extractor, failure)
metricsService.updateMetric(Metrics.itemsFailed)
logger.error(s"Ingest batch execution failure, ${failure.msg}", failure.toThrowable)
postgresClient.insertEvent(
IngestionEvent(
observabilityMetadata,
IngestionEventType.RunExtractor,
EventStatus.Failure,
details = EventDetails.extractorErrorDetails(
extractor.name, failure.msg, failure.toThrowable.getStackTrace.map(element => element.toString).mkString("\n")
)
)
)
completed
}
}
}