in s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala [364:408]
override def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val edgeWithIdxs = edges.zipWithIndex
val (strongEdges, weakEdges) =
edgeWithIdxs.partition { case (edge, idx) =>
val e = edge
e.innerLabel.consistencyLevel == "strong" && e.getOp() != GraphUtil.operations("insertBulk")
}
val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
val mutator = getEdgeMutator(label)
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
/* multiple edges with weak consistency level will be processed as batch */
mutator.mutateWeakEdges(zkQuorum, edges, withWait)
}
Future.sequence(futures)
}
val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") }
val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.getDir(), edge.ts).map(idx -> _)
}
val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
val mutator = getEdgeMutator(label)
val zkQuorum = label.hbaseZkAddr
mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
idxs.zip(rets)
}
}
for {
weak <- Future.sequence(weakEdgesFutures)
deleteAll <- Future.sequence(deleteAllFutures)
strong <- Future.sequence(strongEdgesFutures)
} yield {
(deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(r => new MutateResponse(r._2))
}
}