in backend/app/services/manifest/Neo4jManifest.scala [923:985]
override def rerunSuccessfulExtractorsForBlob(uri: Uri): Attempt[Unit] = attemptTransaction { tx =>
// Re-run extractors that successfully ran before.
// Effectively, re-label all the blob's PROCESSED relations as TODO relations.
// We do this by copying properties onto new TODOs and then deleting the PROCESSEDs.
//
// If we had APOC (neo4j >= 3.5), we could do it a bit more simply:
//
// MATCH (n :Blob {uri: uri})-[p :PROCESSED]->(m)
// WITH collect(p) as processedRelations
// CALL apoc.refactor.rename.type("PROCESSED", "TODO", processedRelations)
// YIELD committedOperations
// RETURN committedOperations
tx.run(
"""
|MATCH (blob :Blob:Resource {uri: {uri}})<-[p :PROCESSED]-(e: Extractor)
|
|WITH blob, collect({relation: p, extractor: e}) as processedRelationsWithExtractors, count(p) as originalNumberOfProcessedRelations
|UNWIND processedRelationsWithExtractors as processed
|
| WITH processed.relation as processedRelation, processed.extractor as extractor, blob, originalNumberOfProcessedRelations
| // CREATE not MERGE
| // because if we have multiple PROCESSED between the same blob & extractor
| // (e.g. because it was uploaded to multiple workspaces)
| // we need to create a TODO for each so we preserve data from each (e.g. the workspace uploaded to).
| // A MERGE won't do this, because we don't have any distinguishing properties on the TODO in the path.
| // We SET them afterwards, because we want to copy properties from the PROCESSED
| // https://neo4j.com/docs/developer-manual/3.3/cypher/clauses/set/#set-copying-properties-between-nodes-and-relationships.
| CREATE (blob)<-[todoForRedoingPreviousSuccess :TODO]-(extractor)
| SET todoForRedoingPreviousSuccess = processedRelation
| SET todoForRedoingPreviousSuccess.attempts = 0
| SET todoForRedoingPreviousSuccess.priority = extractor.priority
| SET todoForRedoingPreviousSuccess.cost = extractor.cost
| DELETE processedRelation
|
| RETURN originalNumberOfProcessedRelations
|""".stripMargin,
parameters(
"uri", uri.value
)
).flatMap(result => {
val counters = result.summary().counters()
val relationshipsCreated = counters.relationshipsCreated()
val relationshipsDeleted = counters.relationshipsDeleted()
val propertiesSet = counters.propertiesSet()
val originalNumberOfProcessedRelations = result
.list()
.asScala
.map(_.get("originalNumberOfProcessedRelations").asInt())
.headOption
.getOrElse(0)
if (relationshipsCreated != originalNumberOfProcessedRelations) {
Attempt.Left(IllegalStateFailure(
s"When re-running successful extractors for blob ${uri.value}, ${relationshipsCreated} TODO relations were created and ${originalNumberOfProcessedRelations} PROCESSED relations deleted. These should be equal"
))
} else {
logger.info(
s"When re-running successful extractors for blob ${uri.value}, ${relationshipsCreated} relations created, ${propertiesSet} properties set and ${relationshipsDeleted} relations deleted"
)
Attempt.Right(())
}
})
}