backend/app/extraction/Worker.scala (134 lines of code) (raw):
package extraction
import cats.syntax.either._
import extraction.Worker.Batch
import model.manifest.{Blob, WorkItem}
import services.manifest.WorkerManifest
import services.observability._
import services.{Metrics, MetricsService, ObjectStorage}
import utils.Logging
import utils.attempt._
import java.io.InputStream
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
import scala.util.control.NonFatal
object Worker extends Logging {
type Batch = List[(Extractor, Blob, ExtractionParams)]
}
class Worker(
val name: String,
manifest: WorkerManifest,
blobStorage: ObjectStorage,
extractors: List[Extractor],
metricsService: MetricsService,
postgresClient: PostgresClient)(implicit executionContext: ExecutionContext) extends Logging {
private val maxBatchSize = 1000 // tasks
private val maxCost = 100 * 1024 * 1024 // 100MB
def pollAndExecute(): Attempt[Int] = {
fetchBatch().map { work =>
val completed = executeBatch(work)
manifest.releaseLocks(name)
completed
}.recoverWith {
case err =>
metricsService.updateMetric(Metrics.batchesFailed)
// on failure, fetchBatch just returns the first failure
logger.error("Error executing batch", err)
manifest.releaseLocks(name)
Attempt.Left(err)
}
}
def fetchBatch(): Attempt[Batch] = {
logger.info("Fetching work")
manifest.fetchWork(name, maxBatchSize, maxCost).toAttempt.flatMap { work =>
Attempt.traverse(work) {
case WorkItem(blob, parentBlobs, extractorName, ingestion, languages, workspace) =>
extractors.find(_.name == extractorName) match {
case Some(extractor) =>
Attempt.Right((extractor, blob, ExtractionParams(ingestion, languages, parentBlobs, workspace)))
case _ =>
val failureMsg = s"Unknown extractor $extractorName"
logger.error(failureMsg)
manifest.logExtractionFailure(blob.uri, extractorName, failureMsg)
Attempt.Left(UnsupportedOperationFailure(failureMsg))
}
}
}
}
def executeBatch(work: Batch): Int = {
if(work.nonEmpty) {
logger.info(s"Found work: ${work.size} assignments")
} else {
logger.info("No work found")
}
work.foldLeft(0) { case(completed, (extractor, blob, params)) =>
logger.info(s"Working on ${blob.uri.value} with ${extractor.name}")
val observabilityMetadata = EventMetadata(blob.uri.value, params.ingestion)
postgresClient.insertEvent(
IngestionEvent(
observabilityMetadata,
IngestionEventType.RunExtractor,
status = EventStatus.Started,
details = EventDetails.extractorDetails(extractor.name))
)
val result = blobStorage.get(blob.uri.toStoragePath)
.flatMap(safeInvokeExtractor(params, extractor, blob, _))
result match {
case Right(_) =>
if (extractor.external) {
markExternalAsProcessing(params, blob, extractor)
} else {
markAsComplete(params, blob, extractor)
postgresClient.insertEvent(
IngestionEvent(
observabilityMetadata,
IngestionEventType.RunExtractor,
status = EventStatus.Success,
details = EventDetails.extractorDetails(extractor.name))
)
}
completed + 1
case Left(SubprocessInterruptedFailure) =>
logger.info(s"Subprocess terminated while processing ${blob.uri.value}")
logger.info("We expect this to happen when a worker instance is terminated midway through a job")
logger.info("I am not marking it in an extraction failure to allow a new worker to pick up the work")
completed
case Left(failure) =>
markAsFailure(blob, extractor, failure)
metricsService.updateMetric(Metrics.itemsFailed)
logger.error(s"Ingest batch execution failure, ${failure.msg}", failure.toThrowable)
postgresClient.insertEvent(
IngestionEvent(
observabilityMetadata,
IngestionEventType.RunExtractor,
EventStatus.Failure,
details = EventDetails.extractorErrorDetails(
extractor.name, failure.msg, failure.toThrowable.getStackTrace.map(element => element.toString).mkString("\n")
)
)
)
completed
}
}
}
// Ideally the extractor is very well behaved, catches any exceptions and wraps them in an Either...
// but that's not always going to be true (especially if we had true pluggable extractors).
// This code is a safety catch-all so that we don't throw away the rest of the batch.
private def safeInvokeExtractor(params: ExtractionParams, extractor: Extractor, blob: Blob, data: InputStream): Either[Failure, Unit] = try {
extractor.extract(blob, data, params)
} catch {
case NonFatal(e) =>
Left(UnknownFailure(e))
}
private def markAsComplete(params: ExtractionParams, blob: Blob, extractor: Extractor): Unit = {
manifest.markAsComplete(params, blob, extractor).leftMap { failure =>
logger.error(s"Failed to mark '${blob.uri.value}' processed by '${extractor.name}' as complete: ${failure.msg}")
}
}
private def markExternalAsProcessing(params: ExtractionParams, blob: Blob, extractor: Extractor): Unit = {
manifest.markExternalAsProcessing(params, blob, extractor).leftMap { failure =>
logger.error(s"Failed to mark '${blob.uri.value}' processed by '${extractor.name}' as complete: ${failure.msg}")
}
}
private def markAsFailure(blob: Blob, extractor: Extractor, failure: Failure): Unit = {
logger.error(s"Error in '${extractor.name} processing ${blob.uri.value}': ${failure.msg}")
manifest.logExtractionFailure(blob.uri, extractor.name, failure.msg).left.foreach { f =>
logger.error(s"Failed to log extractor in manifest: ${f.msg}")
}
}
}