def executeBatch()

in backend/app/extraction/Worker.scala [67:132]


  def executeBatch(work: Batch): Int = {
    if(work.nonEmpty) {
      logger.info(s"Found work: ${work.size} assignments")
    } else {
      logger.info("No work found")
    }

    work.foldLeft(0) { case(completed, (extractor, blob, params)) =>
      logger.info(s"Working on ${blob.uri.value} with ${extractor.name}")

      val observabilityMetadata = EventMetadata(blob.uri.value, params.ingestion)

      postgresClient.insertEvent(
        IngestionEvent(
          observabilityMetadata,
          IngestionEventType.RunExtractor,
          status = EventStatus.Started,
          details = EventDetails.extractorDetails(extractor.name))
      )
      val result = blobStorage.get(blob.uri.toStoragePath)
        .flatMap(safeInvokeExtractor(params, extractor, blob, _))


      result match {
        case Right(_) =>
          if (extractor.external) {
            markExternalAsProcessing(params, blob, extractor)
          } else {
            markAsComplete(params, blob, extractor)
            postgresClient.insertEvent(
              IngestionEvent(
                observabilityMetadata,
                IngestionEventType.RunExtractor,
                status = EventStatus.Success,
                details = EventDetails.extractorDetails(extractor.name))
            )
          }

          completed + 1

        case Left(SubprocessInterruptedFailure) =>
          logger.info(s"Subprocess terminated while processing ${blob.uri.value}")
          logger.info("We expect this to happen when a worker instance is terminated midway through a job")
          logger.info("I am not marking it in an extraction failure to allow a new worker to pick up the work")

          completed

        case Left(failure) =>
          markAsFailure(blob, extractor, failure)
          metricsService.updateMetric(Metrics.itemsFailed)
          logger.error(s"Ingest batch execution failure, ${failure.msg}", failure.toThrowable)

          postgresClient.insertEvent(
            IngestionEvent(
              observabilityMetadata,
              IngestionEventType.RunExtractor,
              EventStatus.Failure,
              details = EventDetails.extractorErrorDetails(
                extractor.name, failure.msg, failure.toThrowable.getStackTrace.map(element => element.toString).mkString("\n")
              )
            )
          )
          completed
      }
    }
  }