in s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala [93:151]
override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
def mutateEdgesInner(edges: Seq[S2EdgeLike],
checkConsistency: Boolean,
withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
assert(edges.nonEmpty)
// TODO:: remove after code review: unreachable code
if (!checkConsistency) {
val futures = edges.map { edge =>
val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
val mutations =
io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
writeToStorage(zkQuorum, mutations, withWait)
}
Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
} else {
optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
}
}
}
val edgeWithIdxs = _edges.zipWithIndex
val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
(edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
} toSeq
val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
// After deleteAll, process others
val mutateEdgeFutures = edges.toList match {
case head :: tail =>
val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
//TODO: decide what we will do on failure on vertex put
val puts = io.buildVertexPutsAsync(head)
val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
Seq(edgeFuture, vertexFuture)
case Nil => Nil
}
val composed = for {
// deleteRet <- Future.sequence(deleteAllFutures)
mutateRet <- Future.sequence(mutateEdgeFutures)
} yield mutateRet
composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
}
Future.sequence(mutateEdges).map { squashedRets =>
squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
}
}