in s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala [582:640]
def buildOperation(invertedEdge: Option[S2EdgeLike], requestEdges: Seq[S2EdgeLike]): (S2EdgeLike, EdgeMutate) = {
// logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
// logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
val oldPropsWithTs =
if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs]
else propsToState(invertedEdge.get.getPropsWithTs())
val funcs = requestEdges.map { edge =>
if (edge.getOp() == GraphUtil.operations("insert")) {
edge.innerLabel.consistencyLevel match {
case "strong" => S2Edge.mergeUpsert _
case _ => S2Edge.mergeInsertBulk _
}
} else if (edge.getOp() == GraphUtil.operations("insertBulk")) {
S2Edge.mergeInsertBulk _
} else if (edge.getOp() == GraphUtil.operations("delete")) {
edge.innerLabel.consistencyLevel match {
case "strong" => S2Edge.mergeDelete _
case _ => throw new RuntimeException("not supported")
}
}
else if (edge.getOp() == GraphUtil.operations("update")) S2Edge.mergeUpdate _
else if (edge.getOp() == GraphUtil.operations("increment")) S2Edge.mergeIncrement _
else throw new RuntimeException(s"not supported operation on edge: $edge")
}
val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal)
val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts)
if (requestWithFuncs.isEmpty) {
(requestEdges.head, EdgeMutate())
} else {
val requestEdge = requestWithFuncs.last._1
var prevPropsWithTs = oldPropsWithTs
for {
(requestEdge, func) <- requestWithFuncs
} {
val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.getPropsWithTs()), requestEdge.ts, requestEdge.innerLabel.schemaVersion))
prevPropsWithTs = _newPropsWithTs
// logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
}
val requestTs = requestEdge.ts
/* version should be monotoniously increasing so our RPC mutation should be applied safely */
val newVersion = invertedEdge.map(e => e.getVersion() + incrementVersion).getOrElse(requestTs)
val maxTs = prevPropsWithTs.map(_._2.ts).max
val newTs = if (maxTs > requestTs) maxTs else requestTs
val propsWithTs = prevPropsWithTs ++
Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs))
val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
// logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
// logger.error(s"$propsWithTs")
val newEdge = requestEdge.copyEdgeWithState(propsWithTs)
(newEdge, edgeMutate)
}
}