backend/app/services/manifest/Neo4jManifest.scala (1,046 lines of code) (raw):
package services.manifest
import cats.instances.either._
import cats.instances.list._
import cats.syntax.traverse._
import commands.IngestFileResult
import extraction.{ExtractionParams, Extractor}
import model._
import model.annotations.WorkspaceMetadata
import model.frontend.email.EmailNeighbours
import model.frontend.{BasicResource, ExtractionFailureSummary, ExtractionFailures, ResourcesForExtractionFailure}
import model.ingestion.{IngestionFile, WorkspaceItemContext, WorkspaceItemUploadContext}
import model.manifest._
import model.user.DBUser
import org.joda.time.DateTime
import org.neo4j.driver.v1.Values.parameters
import org.neo4j.driver.v1.{Driver, StatementResult, StatementRunner, Value}
import services.Neo4jQueryLoggingConfig
import services.manifest.Manifest.WorkCounts
import utils._
import utils.attempt.{Attempt, Failure, IllegalStateFailure, NotFoundFailure}
import java.nio.file.Path
import java.time.Instant
import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters._
object Neo4jManifest {
def setupManifest(driver: Driver, executionContext: ExecutionContext, queryLoggingConfig: Neo4jQueryLoggingConfig): Either[Failure, Manifest] = {
val neo4jManifest = new Neo4jManifest(driver, executionContext, queryLoggingConfig)
neo4jManifest.setup().map(_ => neo4jManifest)
}
}
class Neo4jManifest(driver: Driver, executionContext: ExecutionContext, queryLoggingConfig: Neo4jQueryLoggingConfig)
extends Neo4jHelper(driver, executionContext, queryLoggingConfig) with Manifest with Logging {
import Neo4jHelper._
implicit val ec = executionContext
override def setup(): Either[Failure, Unit] = transaction { tx =>
tx.run("CREATE CONSTRAINT ON (resource: Resource) ASSERT resource.uri IS UNIQUE")
tx.run("CREATE CONSTRAINT ON (extractor: Extractor) ASSERT extractor.name IS UNIQUE")
tx.run("CREATE CONSTRAINT ON (tpe: MimeType) ASSERT tpe.mimeType IS UNIQUE")
Right(())
}
override def insertCollection(uri: String, display: String, createdBy: String): Attempt[Collection] = attemptTransaction { tx =>
tx.run("""
CREATE (c:Collection:Resource {uri: {uri}, display: {display}, createdBy: {createdBy}})
RETURN c
""",
parameters(
"uri", uri,
"display", display,
"createdBy", createdBy
)
).map(result => Collection.fromNeo4jValue(result.single.get("c")))
}
override def getCollections: Attempt[List[Collection]] = attemptTransaction { tx =>
val statementResult = tx.run(
"""
|MATCH (c:Collection)
|OPTIONAL MATCH (c)<-[:PARENT]-(i: Ingestion)
|RETURN c, i
""".stripMargin)
for {
summary <- statementResult
results = summary.list().asScala.toList
} yield {
Collection.mergeCollectionsAndIngestions(results)
}
}
override def getCollectionsForBlob(blobUri: String): Attempt[Map[Collection, Seq[String]]] = attemptTransaction { tx =>
val statementResult = tx.run(
"""
|MATCH (b:Blob:Resource {uri: {blob}})-[r:PARENT*]->(c:Collection)
|OPTIONAL MATCH (u:User)-[:CAN_SEE]->(c:Collection)
|RETURN DISTINCT c, COLLECT(DISTINCT u.username) as usernames
""".stripMargin,
parameters(
"blob", blobUri,
))
for {
summary <- statementResult
results = summary.list().asScala.toList
} yield {
Collection.mergeCollectionAndUsers(results)
}
}
override def getWorkspacesForBlob(blobUri: String): Attempt[List[WorkspaceMetadata]] = attemptTransaction { tx =>
tx.run(
"""
|MATCH (w:WorkspaceNode {uri: {uri}})-[:PART_OF]->(workspace:Workspace)
|MATCH (creator :User)-[:CREATED]->(workspace)<-[:FOLLOWING]-(follower :User)
|return workspace, creator, collect(distinct follower) as followers
|""".stripMargin,
parameters(
"uri", blobUri,
)).map { summary =>
summary.list().asScala.toList.map { r =>
val workspace = r.get("workspace")
val creator = DBUser.fromNeo4jValue(r.get("creator"))
val followers = r.get("followers").asList[DBUser](DBUser.fromNeo4jValue(_)).asScala.toList
WorkspaceMetadata.fromNeo4jValue(workspace, creator, followers)
}
}
}
override def getCollection(collection: Uri): Attempt[Collection] = attemptTransaction { tx =>
val statementResult = tx.run(
"""
|MATCH (c:Collection {uri: {collectionName}})
|OPTIONAL MATCH (c)<-[:PARENT]-(i: Ingestion)
|RETURN c, i
""".stripMargin,
parameters("collectionName", collection.value))
for {
summary <- statementResult
results = summary.list().asScala.toList
collection <- Collection.mergeCollectionsAndIngestions(results).find(_.uri == collection) match {
case Some(c) => Attempt.Right(c)
case None => Attempt.Left(NotFoundFailure(s"Collection $collection does not exist."))
}
} yield collection
}
override def getResource(resourceUri: Uri): Either[Failure, BasicResource] = transaction { tx =>
val result = tx.run(
"""
|MATCH (resource: Resource {uri: {resourceUri}})
|OPTIONAL MATCH (parents: Resource)<-[:PARENT]-(resource)
|OPTIONAL MATCH (resource)<-[:PARENT]-(children: Resource)
|RETURN resource, parents, children
""".stripMargin,
parameters("resourceUri", resourceUri.value))
val failureOrResults = result.list().asScala.hasKeyOrFailure("resource", s"Resource '${resourceUri.value}' does not exist.")
failureOrResults.map{ results =>
val resource = results.map(r => r.get("resource")).toList.distinct
val children = results.filter(r => r.containsKey("children")).map(r => r.get("children")).filter(v => !v.isNull)
.toList.distinct
val parents = results.filter(r => r.containsKey("parents")).map(r => r.get("parents")).filter(v => !v.isNull)
.toList.distinct
BasicResource.fromNeo4jValues(resource.head, parents, children)
}
}
override def getIngestions(collection: Uri): Attempt[Seq[Ingestion]] = attemptTransaction { tx =>
val statementResult = tx.run("MATCH (i :Ingestion)-[:PARENT]->(c :Collection {uri: {collectionName}}) RETURN i, c",
parameters("collectionName", collection.value))
for {
summary <- statementResult
results <- summary.list().asScala.toList.hasKeyOrFailure("c", NotFoundFailure(s"Collection '$collection' does not exist."))
} yield results.map(r => Ingestion.fromNeo4jValue(r.get("i")))
}
override def getIngestionCount(collection: Uri): Attempt[Int] = attemptTransaction { tx =>
val statementResult = tx.run(
"""
|MATCH (c: Collection {uri: {collection}})
|OPTIONAL MATCH (c)<-[:PARENT]-(i: Ingestion)
|RETURN c, COUNT(i)
""".stripMargin,
parameters("collection", collection.value))
for {
summary <- statementResult
records <- summary.list().asScala.hasKeyOrFailure("c", NotFoundFailure(s"Collection '$collection' does not exist."))
} yield {
records.find(r => r.containsKey("COUNT(i)")).map(r => r.get("COUNT(i)").asInt()).getOrElse(0)
}
}
override def getIngestion(uri: Uri): Attempt[Ingestion] = attemptTransaction { tx =>
val statementResult = tx.run(
"""
| MATCH (i: Ingestion { uri: {uri} }) RETURN i
""".stripMargin,
parameters("uri", uri.value)
)
for {
summary <- statementResult
records <- summary.list().asScala.hasKeyOrFailure("i", NotFoundFailure(s"Ingestion '$uri' does not exist"))
} yield {
Ingestion.fromNeo4jValue(records.head.get("i"))
}
}
override def insertIngestion(collectionUri: Uri, ingestionUri: Uri, display: String, path: Option[Path], languages: List[Language], fixed: Boolean, default: Boolean): Attempt[Uri] = attemptTransaction { tx =>
val statementResult = tx.run(
"""
|MATCH (c: Collection {uri: {collectionUri}})
|
|CREATE (i:Ingestion:Resource {
| uri: {uri},
| display: {display},
| startTime: {startTime},
| endTime: {endTime},
| path: {path},
| languages: {languages},
| fixed: {fixed},
| default: {default}
|})-[:PARENT]->(c)
|RETURN i, c
""".stripMargin,
parameters(
"collectionUri", collectionUri.value,
"uri", ingestionUri.value,
"display", display,
"path", path.map(_.toAbsolutePath.toString).orNull,
"startTime", Instant.now.toEpochMilli.asInstanceOf[java.lang.Long],
"endTime", null,
"languages", languages.map(_.key).asJava,
"fixed", Boolean.box(fixed),
"default", Boolean.box(default)
)
)
for {
summary <- statementResult
_ <- summary.list().asScala.hasKeyOrFailure("c", NotFoundFailure(s"Collection '${collectionUri.value}' does not exist."))
} yield ingestionUri
}
def markResourceAsExpandable(tx: StatementRunner, resourceUri: Uri): Either[Failure, Unit] = {
// if resource at uri is a blob, both the blob and its parent:File are expandable
// otherwise it is expandable
// * = isExpandable
//
// (parent:File)*->(resource:Blob)* ->(file:File)->(blob:Blob)
// (resource:Email)* ->(file:File)->(blob:Blob)
// (resource:Directory)*->(file:File)->(blob:Blob)
// (resource:Ingestion)*->(file:File)->(blob:Blob)
tx.run(
"""
|MATCH (resource :Resource {uri: {resourceUri}})
|SET resource.isExpandable = true
""".stripMargin,
parameters(
"resourceUri", resourceUri.value
)
)
tx.run(
"""
|MATCH (blob :Blob:Resource {uri: {resourceUri}})
|MATCH (parent :File:Resource)<-[:PARENT]-(blob)
|SET parent.isExpandable = true
""".stripMargin,
parameters(
"resourceUri", resourceUri.value
)
)
Right(())
}
def markParentFileAsExpandableIfBlobIsExpandable(tx: StatementRunner, blobUri: Uri): Either[Failure, Unit] = {
tx.run(
"""
|MATCH (blob :Blob:Resource {isExpandable: true, uri: {blobUri}})
|MATCH (parent :File:Resource)<-[:PARENT]-(blob)
|SET parent.isExpandable = true
""".stripMargin,
parameters(
"blobUri", blobUri.value
)
)
Right(())
}
def insertDirectory(tx: StatementRunner, parentUri: Uri, uri: Uri, display: Option[String] = None): Either[Failure, Unit] = {
tx.run(
"""
|MERGE (parent:Resource {uri: {parentUri}})
|MERGE (directory:Resource {uri: {uri}})
|SET directory:Directory
|MERGE (directory)-[:PARENT]->(parent)
|
|SET directory.display = {display}
""".stripMargin,
parameters(
"parentUri", parentUri.value,
"uri", uri.value,
"display", display.orNull
)
)
markResourceAsExpandable(tx, parentUri)
Right(())
}
def insertBlob(tx: StatementRunner, file: IngestionFile, uri: Uri, parentBlobs: List[Uri], mimeType: MimeType,
ingestion: String, languages: List[String], extractors: Iterable[Extractor], workspace: Option[WorkspaceItemContext]): Either[Failure, Unit] = {
def toParameterMap(e: Extractor): java.util.Map[String, Object] = {
Map[String, Object](
"name" -> e.name,
"indexing" -> Boolean.box(e.indexing),
"extractorPriority" -> Int.box(e.priority),
"priority" -> Int.box(if(workspace.nonEmpty) { e.priority * 100 } else { e.priority }),
"cost" -> Long.box(e.cost(mimeType, file.size)),
"external" -> Boolean.box(e.external)
).asJava
}
val maybeWorkspaceProperties = workspace.map { _ =>
"""
|,
|workspaceId: {workspaceId},
|workspaceNodeId: {workspaceNodeId},
|workspaceBlobUri: {workspaceBlobUri}
|""".stripMargin
}.getOrElse("")
val result = tx.run(
s"""
|MATCH (parent:Resource {uri: {parentUri}})
|
|MERGE (file:File:Resource {uri: {fileUri}})
|MERGE (blob:Blob:Resource {uri: {blobUri}, size: {size}})
|MERGE (mimeType:MimeType {mimeType: {mimeType}})
|
|MERGE (parent)<-[:PARENT]-(file)
|MERGE (file)<-[:PARENT]-(blob)
|MERGE (blob)-[:TYPE_OF]-(mimeType)
|
|WITH {extractorParamsArray} as extractors
|UNWIND extractors as extractorParam
| MERGE (extractor :Extractor {name: extractorParam.name, indexing: extractorParam.indexing, priority: extractorParam.extractorPriority, external: extractorParam.external})
| WITH extractor, extractorParam.cost as cost, extractorParam.priority as priority
|
| MATCH (unprocessedBlob: Blob:Resource {uri: {blobUri}})
| WHERE
| NOT (unprocessedBlob)<-[:PROCESSED {
| ingestion: {ingestion},
| languages: {languages},
| parentBlobs: {parentBlobs}
| ${maybeWorkspaceProperties}
| } ]-(extractor)
|
| MERGE (unprocessedBlob)<-[todo:TODO {
| ingestion: {ingestion},
| languages: {languages},
| parentBlobs: {parentBlobs}
| ${maybeWorkspaceProperties}
| }]-(extractor)
| ON CREATE SET todo.cost = cost,
| todo.priority = priority,
| todo.attempts = 0
""".stripMargin,
parameters(
"parentUri", file.parentUri.value,
"fileUri", file.uri.value,
"blobUri", uri.value,
"size", file.size.asInstanceOf[java.lang.Long],
"mimeType", mimeType.mimeType,
"ingestion", ingestion,
"extractorParamsArray", extractors.map(toParameterMap).toArray,
"languages", languages.asJava,
"parentBlobs", parentBlobs.map(_.value).toArray,
"workspaceId", workspace.map(_.workspaceId).orNull,
"workspaceNodeId", workspace.map(_.workspaceNodeId).orNull,
"workspaceBlobUri", workspace.map(_.blobAddedToWorkspace).orNull
)
)
// This operation will have an effect when we've just added a CHILD to a blob.
// We look for a parent blob (and its parent file) and mark them as expandable.
markResourceAsExpandable(tx, file.parentUri)
// This operation will have an effect when we've just added a new PARENT to an existing expandable blob.
// When you add a new file parent to an expandable blob, you need to mark that parent isExpandable,
// because if the blob's already there, we won't be re-processing all its children,
// so the markResourceAsExpandable step won't do what we want.
// Note that this is a no-op if we've just added the blob for the first time,
// since we have yet to delve into its children and mark the blob as expandable if it has them.
markParentFileAsExpandableIfBlobIsExpandable(tx, blobUri = uri)
Right(())
}
def insertEmail(tx: StatementRunner, email: Email, parent: Uri): Either[Failure, Unit] = {
// KEEP IN MIND AN EMAIL RECORD IN THE MANIFEST IS A *RECEIVED* EMAIL NOT A SENT EMAIL SO THERE CAN BE MANY COPIES OF IT
tx.run(
"""
|MERGE (parent: Resource {uri: {parentUri}})
|
|MERGE (message:Resource {uri: {uri}})
|SET message:Email
|SET message.haveSource = true
|SET message.display = {subject}
|
|MERGE (parent)<-[:PARENT]-(message)
|
|FOREACH(refersToUri in {referenceUrisParam} |
| MERGE (refersTo:Email:Resource {uri: refersToUri})
| MERGE (refersTo)<-[:REFERENCED]-(message)
|)
|
|FOREACH(repliesToUri in {inReplyToUrisParam} |
| MERGE (reply:Email:Resource {uri: repliesToUri})
| MERGE (reply)<-[:IN_REPLY_TO]-(message)
|)
""".stripMargin,
parameters(
"parentUri", parent.value,
"uri", email.uri.value,
"subject", email.subject,
"referenceUrisParam", email.references.asJava,
"inReplyToUrisParam", email.inReplyTo.asJava
)
)
markResourceAsExpandable(tx, parent)
Right(())
}
override def fetchWork(workerName: String, maxBatchSize: Int, maxCost: Int): Either[Failure, List[WorkItem]] = transaction { tx =>
val summary = tx.run(
s"""
|MERGE (worker:Worker {name: {workerName}})
| WITH
| worker
|
|MATCH (e: Extractor)-[todo: TODO]->(b: Blob:Resource)
| WHERE
| NOT (b)-[:LOCKED_BY]->(:Worker) AND todo.attempts < {maxExtractionAttempts}
|
| WITH worker, todo, e, b
| // priority was originally just defined for extractors, we later extended it out to todos as well
| // This maintains roll forward/backward compatibility with both
| ORDER BY coalesce(todo.priority, e.priority) DESC
| LIMIT {maxBatchSize}
|
|WITH collect({todo: todo, extractor: e, blob: b, worker: worker}) as allTasks
|WITH tail(reduce(acc = [0, []], task in allTasks |
| case
| when size(acc[1]) > 0 AND (acc[0] + task.todo.cost) >= {maxCost}
| then [acc[0], acc[1]]
| else
| [acc[0] + task.todo.cost, acc[1] + task]
| end
| )) as tasks
|
|UNWIND tasks[0] as task
| MATCH (blob: Blob:Resource { uri: task.blob.uri })-[:TYPE_OF]-(m: MimeType)
| MATCH (worker :Worker { name: task.worker.name })
|
| SET task.todo.attempts = task.todo.attempts + 1
| MERGE (blob)-[:LOCKED_BY]->(worker)
|
|RETURN
| blob,
| collect(m) as types,
| task.extractor.name as extractorName,
| task.todo.ingestion as ingestion,
| task.todo.languages as languages,
| task.todo.parentBlobs as parentBlobs,
| task.todo.workspaceId as workspaceId,
| task.todo.workspaceNodeId as workspaceNodeId,
| task.todo.workspaceBlobUri as workspaceBlobUri
""".stripMargin,
parameters(
"workerName", workerName,
"maxExtractionAttempts", Int.box(maxExtractionAttempts),
"maxBatchSize", Int.box(maxBatchSize),
"maxCost", Int.box(maxCost)
)
)
Right(summary.list().asScala.toList.map { r =>
val rawBlob = r.get("blob")
val mimeTypes = r.get("types").values()
val blob = Blob.fromNeo4jValue(rawBlob, mimeTypes.asScala.toSeq)
val extractorName = r.get("extractorName").asString()
val ingestion = r.get("ingestion").asString()
val workspaceId = r.get("workspaceId")
val workspaceNodeId = r.get("workspaceNodeId")
val workspaceBlobUri = r.get("workspaceBlobUri")
val workspace = if(workspaceId.isNull || workspaceNodeId.isNull || workspaceBlobUri.isNull) {
None
} else {
Some(WorkspaceItemContext(workspaceId.asString(), workspaceNodeId.asString(), workspaceBlobUri.asString()))
}
val rawLanguages = r.get("languages")
val rawParentBlobs = r.get("parentBlobs")
if(rawLanguages.isNull || rawParentBlobs.isNull) {
val message = s"NULL languages or parentBlobs! blob: ${blob}. extractorName: ${extractorName}. ingestion: ${ingestion}. rawParentBlobs: ${rawParentBlobs}. rawLanguages: ${rawLanguages}. workspaceId: ${workspaceId}. workspaceNodeId: ${workspaceNodeId}. workspaceBlobUri: ${workspaceBlobUri}"
logger.error(message)
throw new IllegalStateException(message)
} else {
val languages = rawLanguages.asList(_.asString).asScala.toList.flatMap(Languages.getByKey)
val parentBlobs: List[Uri] = rawParentBlobs.asList(_.asString, new java.util.ArrayList[String]()).asScala.toList.map(Uri(_))
WorkItem(blob, parentBlobs, extractorName, ingestion, languages, workspace)
}
})
}
// Called by the workers themselves once a batch work is complete
override def releaseLocks(workerName: String): Either[Failure, Unit] = transaction { tx =>
logger.info(s"Releasing all locks for $workerName")
tx.run(
"""
|MATCH (resource :Resource)-[lock :LOCKED_BY]->(:Worker {name: {workerName}})
|DELETE lock
|WITH resource
|MATCH (resource :Resource)<-[todo:TODO]-(e:Extractor)
|WHERE NOT (resource)<-[:EXTRACTION_FAILURE]-(e)
|SET todo.attempts = 0
""".stripMargin,
parameters(
"workerName", workerName
)
)
Right(())
}
// Called by AWSWorkerControl running on the frontend (webapp) instances to ensure that workers terminated
// by a Riff-Raff deploy don't hold on to their work indefinitely and allow others to pick it up
def releaseLocksForTerminatedWorkers(runningWorkerNames: List[String]): Either[Failure, Unit] = transaction { tx =>
tx.run(
"""
|MATCH (resource :Resource)-[lock:LOCKED_BY]->(w:Worker)
|WHERE NOT w.name in {runningWorkerNames}
|DELETE lock
|WITH resource
|MATCH (resource :Resource)<-[todo:TODO]-(e:Extractor)
|WHERE NOT (resource)<-[:EXTRACTION_FAILURE]-(e)
|SET todo.attempts = 0
""".stripMargin,
parameters(
"runningWorkerNames", runningWorkerNames.asJava
)
)
Right(())
}
override def markExternalAsComplete(uri: String, extractorName: String): Either[Failure, Unit] = transaction { tx =>
logger.info(s"Marking ${uri} / ${extractorName} as complete")
val result = tx.run(
s"""
|MATCH (b :Blob:Resource {uri: {uri}})<-[processing:PROCESSING_EXTERNALLY]-(e: Extractor {name: {extractorName}})
|
|MERGE (b)<-[processed:PROCESSED {
| ingestion: processing.ingestion,
| parentBlobs: processing.parentBlobs,
| languages: processing.languages
| }]-(e)
|
|FOREACH (_ IN CASE WHEN exists(processing.workspaceId) THEN [1] ELSE [] END |
| SET processed.workspaceId = processing.workspaceId
|)
|FOREACH (_ IN CASE WHEN exists(processing.workspaceBlobUri) THEN [1] ELSE [] END |
| SET processed.workspaceBlobUri = processing.workspaceBlobUri
|)
|FOREACH (_ IN CASE WHEN exists(processing.workspaceNodeId) THEN [1] ELSE [] END |
| SET processed.workspaceNodeId = processing.workspaceNodeId
|)
|DELETE processing
|""".stripMargin,
parameters(
"uri", uri,
"extractorName", extractorName,
)
)
val counters = result.summary().counters()
if (counters.relationshipsCreated() != 1 || counters.relationshipsDeleted() != 1) {
Left(IllegalStateFailure(s"Unexpected number of creates/deletes in markAsComplete. Created: ${counters.relationshipsCreated()}. Deleted: ${counters.relationshipsDeleted()}"))
} else {
Right(())
}
}
override def markAsComplete(params: ExtractionParams, blob: Blob, extractor: Extractor): Either[Failure, Unit] = transaction { tx =>
logger.info(s"Marking ${blob.uri.value} / ${extractor.name} as complete")
val maybeWorkspaceProperties = params.workspace.map { _ =>
"""
|,
|workspaceId: {workspaceId},
|workspaceNodeId: {workspaceNodeId},
|workspaceBlobUri: {workspaceBlobUri}
|""".stripMargin
}.getOrElse("")
val result = tx.run(
s"""
|MATCH (b :Blob:Resource {uri: {uri}})<-[todo:TODO {
| ingestion: {ingestion},
| parentBlobs: {parentBlobs},
| languages: {languages}
| ${maybeWorkspaceProperties}
|}]-(e: Extractor {name: {extractorName}})
|
|DELETE todo
|
|MERGE (b)<-[:PROCESSED {
| ingestion: {ingestion},
| parentBlobs: {parentBlobs},
| languages: {languages}
| ${maybeWorkspaceProperties}
|}]-(e)
|""".stripMargin,
parameters(
"uri", blob.uri.value,
"extractorName", extractor.name,
"ingestion", params.ingestion,
"languages", params.languages.map(_.key).asJava,
"parentBlobs", params.parentBlobs.map(_.value).asJava,
"workspaceId", params.workspace.map(_.workspaceId).orNull,
"workspaceNodeId", params.workspace.map(_.workspaceNodeId).orNull,
"workspaceBlobUri", params.workspace.map(_.blobAddedToWorkspace).orNull
)
)
val counters = result.summary().counters()
if(counters.relationshipsCreated() != 1 || counters.relationshipsDeleted() != 1) {
Left(IllegalStateFailure(s"Unexpected number of creates/deletes in markAsComplete. Created: ${counters.relationshipsCreated()}. Deleted: ${counters.relationshipsDeleted()}"))
} else {
Right(())
}
}
override def markExternalAsProcessing(params: ExtractionParams, blob: Blob, extractor: Extractor): Either[Failure, Unit] = transaction { tx =>
logger.info(s"Marking ${blob.uri.value} / ${extractor.name} as complete")
val maybeWorkspaceProperties = params.workspace.map { _ =>
"""
|,
|workspaceId: {workspaceId},
|workspaceNodeId: {workspaceNodeId},
|workspaceBlobUri: {workspaceBlobUri}
|""".stripMargin
}.getOrElse("")
val result = tx.run(
s"""
|MATCH (b :Blob:Resource {uri: {uri}})<-[todo:TODO {
| ingestion: {ingestion},
| parentBlobs: {parentBlobs},
| languages: {languages}
| ${maybeWorkspaceProperties}
|}]-(e: Extractor {name: {extractorName}})
|
|DELETE todo
|
|MERGE (b)<-[processing_externally:PROCESSING_EXTERNALLY {
| ingestion: {ingestion},
| parentBlobs: {parentBlobs},
| languages: {languages}
| ${maybeWorkspaceProperties}
|}]-(e)
| ON CREATE SET processing_externally.attempts = 0,
| processing_externally.cost = e.cost,
| processing_externally.priority = e.priority
|""".stripMargin,
parameters(
"uri", blob.uri.value,
"extractorName", extractor.name,
"ingestion", params.ingestion,
"languages", params.languages.map(_.key).asJava,
"parentBlobs", params.parentBlobs.map(_.value).asJava,
"workspaceId", params.workspace.map(_.workspaceId).orNull,
"workspaceNodeId", params.workspace.map(_.workspaceNodeId).orNull,
"workspaceBlobUri", params.workspace.map(_.blobAddedToWorkspace).orNull
)
)
val counters = result.summary().counters()
if (counters.relationshipsCreated() != 1 || counters.relationshipsDeleted() != 1) {
Left(IllegalStateFailure(s"Unexpected number of creates/deletes in markAsComplete. Created: ${counters.relationshipsCreated()}. Deleted: ${counters.relationshipsDeleted()}"))
} else {
Right(())
}
}
override def logExtractionFailure(blobUri: Uri, extractorName: String, stackTrace: String): Either[Failure, Unit] = transaction { tx =>
tx.run(
"""
|MATCH (b :Blob:Resource {uri: {blobUri}})
|MATCH (e: Extractor {name: {extractorName}})
|CREATE (b)<-[:EXTRACTION_FAILURE {stackTrace: {stackTrace}, at: {atTimeStamp}}]-(e)
""".stripMargin,
parameters(
"blobUri", blobUri.value,
"extractorName", extractorName,
"stackTrace", stackTrace,
"atTimeStamp", DateTime.now().getMillis.asInstanceOf[java.lang.Long]
)
)
Right(())
}
override def getFailedExtractions: Either[Failure, ExtractionFailures] = transaction { tx =>
val summary = tx.run(
s"""
|MATCH (b:Blob)<-[f:EXTRACTION_FAILURE]-(e: Extractor)
|WITH DISTINCT { extractorName: e.name, stackTrace: f.stackTrace } as key, count(DISTINCT b) as numberOfBlobs
|RETURN key, numberOfBlobs
|ORDER BY numberOfBlobs DESC
""".stripMargin
)
val results = summary.list().asScala.toList
val failures = results.map { row =>
val extractorName = row.get("key").get("extractorName").asString()
val stackTrace = row.get("key").get("stackTrace").asString()
val numberOfBlobs = row.get("numberOfBlobs").asLong()
ExtractionFailureSummary(extractorName, stackTrace, numberOfBlobs)
}
Right(ExtractionFailures(failures))
}
def getResourcesForExtractionFailure(extractorName: String, stackTrace: String, page: Long, skip: Long, pageSize: Long): Either[Failure, ResourcesForExtractionFailure] = transaction { tx =>
val summary = tx.run(
s"""
|MATCH (b:Blob)<-[f:EXTRACTION_FAILURE { stackTrace: {stackTrace} }]-(e: Extractor { name: { extractorName} })
|WITH count(DISTINCT b) as count
|
|MATCH (b:Blob)<-[f:EXTRACTION_FAILURE { stackTrace: {stackTrace} }]-(e: Extractor { name: { extractorName} })
|WITH collect(b) as blobs, count
|UNWIND blobs as blob
| OPTIONAL MATCH (parent: Resource)<-[:PARENT]-(blob)
| OPTIONAL MATCH (blob)<-[:PARENT]-(child: Resource)
|
|RETURN blob, collect(parent) as parents, collect(child) as children, count
|SKIP {skip}
|LIMIT {pageSize}
""".stripMargin,
parameters(
"extractorName", extractorName,
"stackTrace", stackTrace,
"skip", Long.box(skip),
"pageSize", Long.box(pageSize)
)
)
val results = summary.list().asScala.toList
val count = results.headOption.map(_.get("count").asLong()).getOrElse(0L)
val resources = results.map { row =>
val blob = row.get("blob")
val parents = row.get("parents").asList((v: Value) => v).asScala.toList
val children = row.get("children").asList((v: Value) => v).asScala.toList
BasicResource.fromNeo4jValues(blob, parents, children)
}
val result = ResourcesForExtractionFailure(count, page, pageSize, resources)
Right(result)
}
override def getFilterableMimeTypes: Either[Failure, List[MimeType]] = transaction { tx =>
val summary = tx.run(
"""
|MATCH (m: MimeType)
| WHERE (m)<-[:TYPE_OF]-(:Blob)<-[:PROCESSED]-(:Extractor {indexing: true})
|
|RETURN m""".stripMargin)
val results = summary.list().asScala.toList
Right(results.map(r => MimeType.fromNeo4jValue(r.get("m"))))
}
override def getAllMimeTypes: Attempt[List[MimeType]] = attemptTransaction { tx =>
val statementResult = tx.run(
"""
|MATCH (m: MimeType)
|
|RETURN m""".stripMargin)
for {
summary <- statementResult
results = summary.list().asScala.toList
} yield {
results.map(r => MimeType.fromNeo4jValue(r.get("m")))
}
}
override def getMimeTypesCoverage: Either[Failure, List[MimeTypeCoverage]] = transaction { tx =>
val counts = tx.run(
"""
|MATCH (mimeType:MimeType)
|WITH collect(mimeType) as types
|UNWIND types as type
| MATCH (type)<-[:TYPE_OF]-(b:Blob)
| WITH type, count(b) as total
| OPTIONAL MATCH (type)<-[:TYPE_OF]-(b:Blob)<-[todo:TODO { attempts: 0 }]-(:Extractor)
| WITH type, total, count(todo) as todo
| OPTIONAL MATCH (type)<-[:TYPE_OF]-(b:Blob)<-[done:PROCESSED]-(:Extractor)
| WITH type, total, todo, count(done) as done
| OPTIONAL MATCH (type)<-[:TYPE_OF]-(:Blob)<-[failed:EXTRACTION_FAILURE]-(:Extractor)
| WITH type, total, todo, done, count(failed) as failed
|RETURN type, total, todo, done, failed
""".stripMargin
)
val coverageList = counts.list().asScala.toList.map { record =>
val mimeType = MimeType.fromNeo4jValue(record.get("type"))
MimeTypeCoverage(
mimeType,
total = record.get("total").asLong(),
todo = record.get("todo").asLong(),
done = record.get("done").asLong(),
failed = record.get("failed").asLong(),
humanReadableMimeType = MimeDetails.displayMap.get(mimeType.mimeType).map(_.display)
)
}
Right(coverageList)
}
def getEmailThread(uri: String): Attempt[List[EmailNeighbours]] = attemptTransaction { tx =>
val attemptResult = tx.run(
"""
| MATCH (root:Email:Resource {uri: {uri}})
| OPTIONAL MATCH (root)-[r]->(adjacent)
| WHERE type(r) in ['IN_REPLY_TO', 'REFERENCED']
| RETURN
| root as node,
| CASE WHEN adjacent IS NOT NULL THEN
| collect(DISTINCT { relation: type(r), uri: adjacent.uri})
| ELSE
| []
| END as neighbours
|UNION
| MATCH (root:Email:Resource {uri: {uri}})
| OPTIONAL MATCH p=(root)-[*0..5]-(e2:Email)
| WHERE ALL (rs in relationships(p) WHERE type(rs) in ['IN_REPLY_TO', 'REFERENCED'])
| WITH DISTINCT e2, root
| OPTIONAL MATCH (e2)-[r]->(adjacent:Email)
| WHERE type(r) in ['IN_REPLY_TO', 'REFERENCED']
| RETURN
| e2 as node,
| CASE WHEN adjacent IS NOT NULL THEN
| collect(DISTINCT { relation: type(r), uri: adjacent.uri})
| ELSE
| []
| END as neighbours
""".stripMargin,
"uri" -> uri
)
attemptResult.flatMap { result =>
Attempt.traverse(result.list().asScala.toList) { record =>
val node = record.get("node")
val neighbours = record.get("neighbours")
EmailNeighbours.fromValues(node, neighbours)
}
}
}
override def insert(events: Seq[Manifest.Insertion], rootUri: Uri): Either[Failure, Unit] = transaction { tx =>
def insertions() = events.toList.traverse {
case Manifest.InsertDirectory(parentUri, uri) =>
insertDirectory(tx, parentUri = parentUri, uri = uri)
case Manifest.InsertBlob(file, blobUri, parentBlobs, mimeType, ingestion, languages, extractors, workspace) =>
insertBlob(tx, file, blobUri, parentBlobs, mimeType, ingestion, languages, extractors, workspace)
case Manifest.InsertEmail(email, parent) =>
insertEmail(tx, email, parent)
}
def setEndTimeIfComplete() = {
tx.run(
"""
|MATCH (root: Resource {uri: {uri}})
|SET root.endTime = {endTime}
""".stripMargin,
parameters(
"uri", rootUri.value,
"endTime", Instant.now.toEpochMilli.asInstanceOf[java.lang.Long]
)
)
Right(())
}
for {
_ <- insertions()
_ <- setEndTimeIfComplete()
} yield ()
}
override def rerunSuccessfulExtractorsForBlob(uri: Uri): Attempt[Unit] = attemptTransaction { tx =>
// Re-run extractors that successfully ran before.
// Effectively, re-label all the blob's PROCESSED relations as TODO relations.
// We do this by copying properties onto new TODOs and then deleting the PROCESSEDs.
//
// If we had APOC (neo4j >= 3.5), we could do it a bit more simply:
//
// MATCH (n :Blob {uri: uri})-[p :PROCESSED]->(m)
// WITH collect(p) as processedRelations
// CALL apoc.refactor.rename.type("PROCESSED", "TODO", processedRelations)
// YIELD committedOperations
// RETURN committedOperations
tx.run(
"""
|MATCH (blob :Blob:Resource {uri: {uri}})<-[p :PROCESSED]-(e: Extractor)
|
|WITH blob, collect({relation: p, extractor: e}) as processedRelationsWithExtractors, count(p) as originalNumberOfProcessedRelations
|UNWIND processedRelationsWithExtractors as processed
|
| WITH processed.relation as processedRelation, processed.extractor as extractor, blob, originalNumberOfProcessedRelations
| // CREATE not MERGE
| // because if we have multiple PROCESSED between the same blob & extractor
| // (e.g. because it was uploaded to multiple workspaces)
| // we need to create a TODO for each so we preserve data from each (e.g. the workspace uploaded to).
| // A MERGE won't do this, because we don't have any distinguishing properties on the TODO in the path.
| // We SET them afterwards, because we want to copy properties from the PROCESSED
| // https://neo4j.com/docs/developer-manual/3.3/cypher/clauses/set/#set-copying-properties-between-nodes-and-relationships.
| CREATE (blob)<-[todoForRedoingPreviousSuccess :TODO]-(extractor)
| SET todoForRedoingPreviousSuccess = processedRelation
| SET todoForRedoingPreviousSuccess.attempts = 0
| SET todoForRedoingPreviousSuccess.priority = extractor.priority
| SET todoForRedoingPreviousSuccess.cost = extractor.cost
| DELETE processedRelation
|
| RETURN originalNumberOfProcessedRelations
|""".stripMargin,
parameters(
"uri", uri.value
)
).flatMap(result => {
val counters = result.summary().counters()
val relationshipsCreated = counters.relationshipsCreated()
val relationshipsDeleted = counters.relationshipsDeleted()
val propertiesSet = counters.propertiesSet()
val originalNumberOfProcessedRelations = result
.list()
.asScala
.map(_.get("originalNumberOfProcessedRelations").asInt())
.headOption
.getOrElse(0)
if (relationshipsCreated != originalNumberOfProcessedRelations) {
Attempt.Left(IllegalStateFailure(
s"When re-running successful extractors for blob ${uri.value}, ${relationshipsCreated} TODO relations were created and ${originalNumberOfProcessedRelations} PROCESSED relations deleted. These should be equal"
))
} else {
logger.info(
s"When re-running successful extractors for blob ${uri.value}, ${relationshipsCreated} relations created, ${propertiesSet} properties set and ${relationshipsDeleted} relations deleted"
)
Attempt.Right(())
}
})
}
override def rerunFailedExtractorsForBlob(uri: Uri): Attempt[Unit] = attemptTransaction { tx =>
// Re-run extractors that failed.
// Failed extractors leave in place a TODO with attempts > 0 and an EXTRACTION_FAILURE.
// So we delete the EXTRACTION_FAILUREs and set attempts to 0 on the TODOs.
tx.run(
"""
|MATCH (blob :Blob:Resource {uri: {uri}})<-[failure :EXTRACTION_FAILURE]-(failedExtractor :Extractor)
|DELETE failure
|
|// we need DISTINCT because if there are multiple failures
|// we'll get the blob and failedExtractor duplicated
|WITH DISTINCT blob, failedExtractor
|MATCH (blob)<-[todo :TODO]-(failedExtractor)
|WHERE todo.attempts > 0
|SET todo.attempts = 0
|""".stripMargin,
parameters(
"uri", uri.value
)
).flatMap(result => {
val counters = result.summary().counters()
val relationshipsCreated = counters.relationshipsCreated()
val relationshipsDeleted = counters.relationshipsDeleted()
val propertiesSet = counters.propertiesSet()
if (propertiesSet != relationshipsDeleted) {
Attempt.Left(IllegalStateFailure(
s"When re-running failed extractors for blob ${uri.value}, ${relationshipsDeleted} EXTRACTION_FAILURE relations were deleted and ${propertiesSet} TODOs had their attempts reset to 0. These should be equal"
))
} else {
logger.info(
s"When re-running failed extractors for blob ${uri.value}, ${relationshipsCreated} relations created, ${propertiesSet} properties set and ${relationshipsDeleted} relations deleted"
)
Attempt.Right(())
}
})
}
override def getBlob(uri: Uri): Either[Failure, Blob] = transaction { tx =>
val summary = tx.run(
"""
|MATCH (b :Blob:Resource {uri: {uri}})-[:TYPE_OF]->(m :MimeType)
|RETURN b, m""".stripMargin,
parameters(
"uri", uri.value
))
val results = summary.list().asScala.toList
results match {
case Nil =>
Left(NotFoundFailure(s"Blob ${uri.value} not found"))
case head :: tail =>
val mimeTypes = (head :: tail).map(_.get("m"))
Right(Blob.fromNeo4jValue(head.get("b"), mimeTypes))
}
}
override def getBlobsForFiles(fileUris: List[String]): Either[Failure, Map[String, Blob]] = transaction { tx =>
val result = tx.run(
"""
|MATCH (b: Blob:Resource)-[:PARENT]->(f: File:Resource)
|WHERE f.uri IN {fileUris}
|RETURN b,f
""".stripMargin,
parameters(
"fileUris", fileUris.asJava
)
)
Right(result.iterator.foldLeft(Map.empty[String, Blob]) { (acc, record) =>
val uri = record.get("f").get("uri").asString()
val blob = Blob.fromNeo4jValue(record.get("b"), Seq.empty)
acc + (uri -> blob)
})
}
override def getWorkCounts(): Either[Failure, WorkCounts] = transaction { tx =>
val result = tx.run(
"""
|OPTIONAL MATCH (:Extractor)-[t:TODO]->(b:Blob)
|WHERE (b)-[:LOCKED_BY]->(:Worker)
|WITH count(t) as inProgress
|
|OPTIONAL MATCH (:Extractor)-[t:TODO]->(b:Blob)
|WHERE t.attempts < {maxExtractionAttempts} AND NOT (b)-[:LOCKED_BY]->(:Worker) AND NOT (b)<-[:EXTRACTION_FAILURE]-(:Extractor)
|RETURN inProgress, count(t) as outstanding
""".stripMargin,
parameters(
"maxExtractionAttempts", Int.box(maxExtractionAttempts),
)
).single()
val inProgress = result.get("inProgress").asInt()
val outstanding = result.get("outstanding").asInt()
Right(WorkCounts(inProgress, outstanding))
}
// Swallows errors if blob has not been processed with OcrMyPdfExtractor
// (simply returns an empty list)
override def getLanguagesProcessedByOcrMyPdf(uri: Uri): Attempt[List[Language]] = attemptTransaction { tx =>
tx.run(
"""
|MATCH (r: Resource { uri: {uri} } )<-[p:PROCESSED]-(e :Extractor {name: "OcrMyPdfExtractor"})
|RETURN p.languages as languages
""".stripMargin,
parameters(
"uri", uri.value
)
).map { queryResultSummary =>
val results = queryResultSummary.list().asScala.toList
results.headOption.toList.flatMap(r =>
r.get("languages")
.asList((v: Value) => v.asString())
.asScala
.toList
.flatMap(Languages.getByKey)
)
}
}
private def processDelete(uri: Uri, query: String, correctResultsCount: Int => Boolean, errorText: String): Attempt[Unit] = attemptTransaction { tx =>
tx.run(
query,
parameters(
"uri", uri.value
)).flatMap { result: StatementResult =>
val counters = result.summary().counters()
if (correctResultsCount(counters.nodesDeleted())) Attempt.Right(())
else {
Attempt.Left(IllegalStateFailure(s"$errorText of blob $uri. Nodes deleted: ${counters.nodesDeleted()}"))
}
}
}
// If this blob has children, the neo4j structure beneath it (down to
// the next :Blob descendant) will be deleted. This leaves orphaned blobs
// in neo4j, elasticsearch and S3, but it is expected that it will be called either:
// 1. from the UI, in which case the operation is disallowed if it has children
// 2. as part of deleting an ingestion or collection, in which case those
// orphaned blobs will have been returned from the initial getBlobs ES query
// and will therefore be deleted separately as we loop through them.
def deleteBlob(uri: Uri): Attempt[Unit] = processDelete(
uri,
// OPTIONAL MATCH so if there's no Workspace node pointing to it,
// we still delete the blob, and vice versa. (The vice versa would be
// unexpected but maybe you're clearing up a blob that was only partially deleted before).
"""
|OPTIONAL MATCH (w: WorkspaceNode { uri: { uri }})
|OPTIONAL MATCH (b: Blob:Resource { uri: { uri }})-[:PARENT]->(f: File)
|OPTIONAL MATCH (descendant :Resource)
| WHERE descendant.uri STARTS WITH {uri}
|DETACH DELETE b, f, w, descendant
""".stripMargin,
// Always consider the deletion a success.
// We can't set a lower bound, because if nothing has been deleted,
// the deletion may have been triggered from a list of results coming
// back from Elasticsearch (which is eventually consistent so doesn't
// immediately show that the delete happened).
// TODO MRB: handle the above more gracefully at a higher level, it's a hack down here
// And we can't set an upper bound, because there will be an indeterminate
// number of descendants deleted.
count => true,
"Error deleting blob")
// This will delete descendants down to the next blobs,
// at which point the URL pattern starts again.
def deleteResourceAndDescendants(uri: Uri): Attempt[Unit] = attemptTransaction { tx =>
tx.run(
"""
|MATCH (r: Resource)
|WHERE r.uri STARTS WITH {uri}
|DETACH DELETE r
""".stripMargin,
parameters(
"uri", uri.value
)
).flatMap { result =>
val counters = result.summary().counters()
if(counters.nodesDeleted() < 1) {
Attempt.Left(IllegalStateFailure(s"Error deleting ingestion $uri. Nodes deleted: ${counters.nodesDeleted()}"))
} else {
Attempt.Right(())
}
}
}
override def setProgressNote(blobUri: Uri, extractor: Extractor, note: String): Either[Failure, Unit] = transaction { tx =>
val result = tx.run(
"""
|MATCH (b :Blob :Resource { uri: {blobUri} })<-[todo:TODO]-(:Extractor { name: {extractorName} })
|SET todo.note = {note}
|""".stripMargin,
parameters(
"blobUri", blobUri.value,
"extractorName", extractor.name,
"note", note
)
)
val counters = result.summary().counters()
if(counters.propertiesSet() != 1) {
Left(IllegalStateFailure(s"Error in setProgressNote: unexpected properties set ${counters.propertiesSet()}"))
} else {
Right(())
}
}
override def getWorkspaceChildrenWithUri(workspaceContext: Option[WorkspaceItemUploadContext], childUri: String) = attemptTransaction { tx =>
if (workspaceContext.isDefined) {
tx.run(
"""
|match (workspaceNode:WorkspaceNode {id: {workspaceNodeId} })<-[:PARENT]-(child:WorkspaceNode {uri: {childUri} })
|return child
|""".stripMargin,
parameters(
"workspaceNodeId", workspaceContext.get.workspaceParentNodeId,
"childUri", childUri
)
).map { result: StatementResult =>
val children = result.list().asScala.toList
val childrenUris = children.map { c =>
val node = c.get("child")
val uri = node.get("uri").asString()
val mimeType = node.get("mimeType").asString()
val size = node.get("size").asLong()
val id = node.get("id").asString()
IngestFileResult(Blob(Uri(uri), size, Set(MimeType(mimeType))), Some(id))
}
childrenUris
}
} else {
Attempt.Left(NotFoundFailure(s"No children with this"))
}
}
}