in backend/app/services/manifest/Neo4jManifest.scala [441:528]
override def fetchWork(workerName: String, maxBatchSize: Int, maxCost: Int): Either[Failure, List[WorkItem]] = transaction { tx =>
val summary = tx.run(
s"""
|MERGE (worker:Worker {name: {workerName}})
| WITH
| worker
|
|MATCH (e: Extractor)-[todo: TODO]->(b: Blob:Resource)
| WHERE
| NOT (b)-[:LOCKED_BY]->(:Worker) AND todo.attempts < {maxExtractionAttempts}
|
| WITH worker, todo, e, b
| // priority was originally just defined for extractors, we later extended it out to todos as well
| // This maintains roll forward/backward compatibility with both
| ORDER BY coalesce(todo.priority, e.priority) DESC
| LIMIT {maxBatchSize}
|
|WITH collect({todo: todo, extractor: e, blob: b, worker: worker}) as allTasks
|WITH tail(reduce(acc = [0, []], task in allTasks |
| case
| when size(acc[1]) > 0 AND (acc[0] + task.todo.cost) >= {maxCost}
| then [acc[0], acc[1]]
| else
| [acc[0] + task.todo.cost, acc[1] + task]
| end
| )) as tasks
|
|UNWIND tasks[0] as task
| MATCH (blob: Blob:Resource { uri: task.blob.uri })-[:TYPE_OF]-(m: MimeType)
| MATCH (worker :Worker { name: task.worker.name })
|
| SET task.todo.attempts = task.todo.attempts + 1
| MERGE (blob)-[:LOCKED_BY]->(worker)
|
|RETURN
| blob,
| collect(m) as types,
| task.extractor.name as extractorName,
| task.todo.ingestion as ingestion,
| task.todo.languages as languages,
| task.todo.parentBlobs as parentBlobs,
| task.todo.workspaceId as workspaceId,
| task.todo.workspaceNodeId as workspaceNodeId,
| task.todo.workspaceBlobUri as workspaceBlobUri
""".stripMargin,
parameters(
"workerName", workerName,
"maxExtractionAttempts", Int.box(maxExtractionAttempts),
"maxBatchSize", Int.box(maxBatchSize),
"maxCost", Int.box(maxCost)
)
)
Right(summary.list().asScala.toList.map { r =>
val rawBlob = r.get("blob")
val mimeTypes = r.get("types").values()
val blob = Blob.fromNeo4jValue(rawBlob, mimeTypes.asScala.toSeq)
val extractorName = r.get("extractorName").asString()
val ingestion = r.get("ingestion").asString()
val workspaceId = r.get("workspaceId")
val workspaceNodeId = r.get("workspaceNodeId")
val workspaceBlobUri = r.get("workspaceBlobUri")
val workspace = if(workspaceId.isNull || workspaceNodeId.isNull || workspaceBlobUri.isNull) {
None
} else {
Some(WorkspaceItemContext(workspaceId.asString(), workspaceNodeId.asString(), workspaceBlobUri.asString()))
}
val rawLanguages = r.get("languages")
val rawParentBlobs = r.get("parentBlobs")
if(rawLanguages.isNull || rawParentBlobs.isNull) {
val message = s"NULL languages or parentBlobs! blob: ${blob}. extractorName: ${extractorName}. ingestion: ${ingestion}. rawParentBlobs: ${rawParentBlobs}. rawLanguages: ${rawLanguages}. workspaceId: ${workspaceId}. workspaceNodeId: ${workspaceNodeId}. workspaceBlobUri: ${workspaceBlobUri}"
logger.error(message)
throw new IllegalStateException(message)
} else {
val languages = rawLanguages.asList(_.asString).asScala.toList.flatMap(Languages.getByKey)
val parentBlobs: List[Uri] = rawParentBlobs.asList(_.asString, new java.util.ArrayList[String]()).asScala.toList.map(Uri(_))
WorkItem(blob, parentBlobs, extractorName, ingestion, languages, workspace)
}
})
}