override def mutateStrongEdges()

in s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala [93:151]


  override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
    def mutateEdgesInner(edges: Seq[S2EdgeLike],
                         checkConsistency: Boolean,
                         withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
      assert(edges.nonEmpty)
      // TODO:: remove after code review: unreachable code
      if (!checkConsistency) {

        val futures = edges.map { edge =>
          val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))

          val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
          val mutations =
            io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr

          if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)

          writeToStorage(zkQuorum, mutations, withWait)
        }
        Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
      } else {
        optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
          conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
        }
      }
    }

    val edgeWithIdxs = _edges.zipWithIndex
    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
    } toSeq

    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
      val edges = edgeGroup.map(_._1)
      val idxs = edgeGroup.map(_._2)
      // After deleteAll, process others
      val mutateEdgeFutures = edges.toList match {
        case head :: tail =>
          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)

          //TODO: decide what we will do on failure on vertex put
          val puts = io.buildVertexPutsAsync(head)
          val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
          Seq(edgeFuture, vertexFuture)
        case Nil => Nil
      }

      val composed = for {
        //        deleteRet <- Future.sequence(deleteAllFutures)
        mutateRet <- Future.sequence(mutateEdgeFutures)
      } yield mutateRet

      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
    }

    Future.sequence(mutateEdges).map { squashedRets =>
      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
    }
  }