in s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala [642:689]
def buildMutation(snapshotEdgeOpt: Option[S2EdgeLike],
requestEdge: S2EdgeLike,
newVersion: Long,
oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs],
newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = {
if (oldPropsWithTs == newPropsWithTs) {
// all requests should be dropped. so empty mutation.
EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None)
} else {
val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq)
val newOp = snapshotEdgeOpt match {
case None => requestEdge.getOp()
case Some(old) =>
val oldMaxTs = old.getPropsWithTs().asScala.map(_._2.ts).max
if (oldMaxTs > requestEdge.ts) old.getOp()
else requestEdge.getOp()
}
val newSnapshotEdge = requestEdge.copyOp(newOp).copyVersion(newVersion).copyEdgeWithState(newPropsWithTs)
val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge)
// delete request must always update snapshot.
if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) {
// no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt.
EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt)
} else {
val edgesToDelete = snapshotEdgeOpt match {
case Some(snapshotEdge) if snapshotEdge.getOp() != GraphUtil.operations("delete") =>
snapshotEdge.copyOp(GraphUtil.defaultOpByte)
.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
case _ => Nil
}
val edgesToInsert =
if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
else {
val newEdge = requestEdge.copyOp(GraphUtil.defaultOpByte).copyVersion(newVersion).copyEdgeWithState(S2Edge.EmptyState)
newPropsWithTs.foreach { case (k, v) => newEdge.propertyInner(k.name, v.innerVal.value, v.ts) }
newEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
}
EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt)
}
}
}