override def rerunSuccessfulExtractorsForBlob()

in backend/app/services/manifest/Neo4jManifest.scala [923:985]


  override def rerunSuccessfulExtractorsForBlob(uri: Uri): Attempt[Unit] = attemptTransaction { tx =>
    // Re-run extractors that successfully ran before.
    // Effectively, re-label all the blob's PROCESSED relations as TODO relations.
    // We do this by copying properties onto new TODOs and then deleting the PROCESSEDs.
    //
    // If we had APOC (neo4j >= 3.5), we could do it a bit more simply:
    //
    // MATCH (n :Blob {uri: uri})-[p :PROCESSED]->(m)
    // WITH collect(p) as processedRelations
    // CALL apoc.refactor.rename.type("PROCESSED", "TODO", processedRelations)
    // YIELD committedOperations
    // RETURN committedOperations
    tx.run(
      """
        |MATCH (blob :Blob:Resource {uri: {uri}})<-[p :PROCESSED]-(e: Extractor)
        |
        |WITH blob, collect({relation: p, extractor: e}) as processedRelationsWithExtractors, count(p) as originalNumberOfProcessedRelations
        |UNWIND processedRelationsWithExtractors as processed
        |
        |  WITH processed.relation as processedRelation, processed.extractor as extractor, blob, originalNumberOfProcessedRelations
        |  // CREATE not MERGE
        |  // because if we have multiple PROCESSED between the same blob & extractor
        |  // (e.g. because it was uploaded to multiple workspaces)
        |  // we need to create a TODO for each so we preserve data from each (e.g. the workspace uploaded to).
        |  // A MERGE won't do this, because we don't have any distinguishing properties on the TODO in the path.
        |  // We SET them afterwards, because we want to copy properties from the PROCESSED
        |  // https://neo4j.com/docs/developer-manual/3.3/cypher/clauses/set/#set-copying-properties-between-nodes-and-relationships.
        |  CREATE (blob)<-[todoForRedoingPreviousSuccess :TODO]-(extractor)
        |  SET todoForRedoingPreviousSuccess = processedRelation
        |  SET todoForRedoingPreviousSuccess.attempts = 0
        |  SET todoForRedoingPreviousSuccess.priority = extractor.priority
        |  SET todoForRedoingPreviousSuccess.cost = extractor.cost
        |  DELETE processedRelation
        |
        |  RETURN originalNumberOfProcessedRelations
        |""".stripMargin,
      parameters(
        "uri", uri.value
      )
    ).flatMap(result => {
      val counters = result.summary().counters()
      val relationshipsCreated = counters.relationshipsCreated()
      val relationshipsDeleted = counters.relationshipsDeleted()
      val propertiesSet = counters.propertiesSet()
      val originalNumberOfProcessedRelations = result
        .list()
        .asScala
        .map(_.get("originalNumberOfProcessedRelations").asInt())
        .headOption
        .getOrElse(0)

      if (relationshipsCreated != originalNumberOfProcessedRelations) {
        Attempt.Left(IllegalStateFailure(
          s"When re-running successful extractors for blob ${uri.value}, ${relationshipsCreated} TODO relations were created and ${originalNumberOfProcessedRelations} PROCESSED relations deleted. These should be equal"
        ))
      } else {
        logger.info(
          s"When re-running successful extractors for blob ${uri.value}, ${relationshipsCreated} relations created, ${propertiesSet} properties set and ${relationshipsDeleted} relations deleted"
        )
        Attempt.Right(())
      }
    })
  }