override def fetchWork()

in backend/app/services/manifest/Neo4jManifest.scala [441:528]


  override def fetchWork(workerName: String, maxBatchSize: Int, maxCost: Int): Either[Failure, List[WorkItem]] = transaction { tx =>
    val summary = tx.run(
      s"""
        |MERGE (worker:Worker {name: {workerName}})
        |  WITH
        |    worker
        |
        |MATCH (e: Extractor)-[todo: TODO]->(b: Blob:Resource)
        |  WHERE
        |    NOT (b)-[:LOCKED_BY]->(:Worker) AND todo.attempts < {maxExtractionAttempts}
        |
        |  WITH worker, todo, e, b
        |  // priority was originally just defined for extractors, we later extended it out to todos as well
        |  // This maintains roll forward/backward compatibility with both
        |  ORDER BY coalesce(todo.priority, e.priority) DESC
        |  LIMIT {maxBatchSize}
        |
        |WITH collect({todo: todo, extractor: e, blob: b, worker: worker}) as allTasks
        |WITH tail(reduce(acc = [0, []], task in allTasks |
        |    case
        |      when size(acc[1]) > 0 AND (acc[0] + task.todo.cost) >= {maxCost}
        |        then [acc[0], acc[1]]
        |      else
        |        [acc[0] + task.todo.cost, acc[1] + task]
        |     end
        |  )) as tasks
        |
        |UNWIND tasks[0] as task
        |  MATCH (blob: Blob:Resource { uri: task.blob.uri })-[:TYPE_OF]-(m: MimeType)
        |  MATCH (worker :Worker { name: task.worker.name })
        |
        |  SET task.todo.attempts = task.todo.attempts + 1
        |  MERGE (blob)-[:LOCKED_BY]->(worker)
        |
        |RETURN
        |    blob,
        |    collect(m) as types,
        |    task.extractor.name as extractorName,
        |    task.todo.ingestion as ingestion,
        |    task.todo.languages as languages,
        |    task.todo.parentBlobs as parentBlobs,
        |    task.todo.workspaceId as workspaceId,
        |    task.todo.workspaceNodeId as workspaceNodeId,
        |    task.todo.workspaceBlobUri as workspaceBlobUri
      """.stripMargin,
      parameters(
        "workerName", workerName,
        "maxExtractionAttempts", Int.box(maxExtractionAttempts),
        "maxBatchSize", Int.box(maxBatchSize),
        "maxCost", Int.box(maxCost)
      )
    )

    Right(summary.list().asScala.toList.map { r =>
      val rawBlob = r.get("blob")
      val mimeTypes = r.get("types").values()

      val blob = Blob.fromNeo4jValue(rawBlob, mimeTypes.asScala.toSeq)
      val extractorName = r.get("extractorName").asString()

      val ingestion = r.get("ingestion").asString()

      val workspaceId = r.get("workspaceId")
      val workspaceNodeId = r.get("workspaceNodeId")
      val workspaceBlobUri = r.get("workspaceBlobUri")

      val workspace = if(workspaceId.isNull || workspaceNodeId.isNull || workspaceBlobUri.isNull) {
        None
      } else {
        Some(WorkspaceItemContext(workspaceId.asString(), workspaceNodeId.asString(), workspaceBlobUri.asString()))
      }

      val rawLanguages = r.get("languages")
      val rawParentBlobs = r.get("parentBlobs")

      if(rawLanguages.isNull || rawParentBlobs.isNull) {
        val message = s"NULL languages or parentBlobs! blob: ${blob}. extractorName: ${extractorName}. ingestion: ${ingestion}. rawParentBlobs: ${rawParentBlobs}. rawLanguages: ${rawLanguages}. workspaceId: ${workspaceId}. workspaceNodeId: ${workspaceNodeId}. workspaceBlobUri: ${workspaceBlobUri}"
        logger.error(message)

        throw new IllegalStateException(message)
      } else {
        val languages = rawLanguages.asList(_.asString).asScala.toList.flatMap(Languages.getByKey)
        val parentBlobs: List[Uri] = rawParentBlobs.asList(_.asString, new java.util.ArrayList[String]()).asScala.toList.map(Uri(_))

        WorkItem(blob, parentBlobs, extractorName, ingestion, languages, workspace)
      }
    })
  }