def buildOperation()

in s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala [582:640]


  def buildOperation(invertedEdge: Option[S2EdgeLike], requestEdges: Seq[S2EdgeLike]): (S2EdgeLike, EdgeMutate) = {
    //            logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
    //            logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
    val oldPropsWithTs =
    if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs]
    else propsToState(invertedEdge.get.getPropsWithTs())

    val funcs = requestEdges.map { edge =>
      if (edge.getOp() == GraphUtil.operations("insert")) {
        edge.innerLabel.consistencyLevel match {
          case "strong" => S2Edge.mergeUpsert _
          case _ => S2Edge.mergeInsertBulk _
        }
      } else if (edge.getOp() == GraphUtil.operations("insertBulk")) {
        S2Edge.mergeInsertBulk _
      } else if (edge.getOp() == GraphUtil.operations("delete")) {
        edge.innerLabel.consistencyLevel match {
          case "strong" => S2Edge.mergeDelete _
          case _ => throw new RuntimeException("not supported")
        }
      }
      else if (edge.getOp() == GraphUtil.operations("update")) S2Edge.mergeUpdate _
      else if (edge.getOp() == GraphUtil.operations("increment")) S2Edge.mergeIncrement _
      else throw new RuntimeException(s"not supported operation on edge: $edge")
    }

    val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal)
    val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts)

    if (requestWithFuncs.isEmpty) {
      (requestEdges.head, EdgeMutate())
    } else {
      val requestEdge = requestWithFuncs.last._1
      var prevPropsWithTs = oldPropsWithTs

      for {
        (requestEdge, func) <- requestWithFuncs
      } {
        val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.getPropsWithTs()), requestEdge.ts, requestEdge.innerLabel.schemaVersion))
        prevPropsWithTs = _newPropsWithTs
        //        logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
      }
      val requestTs = requestEdge.ts
      /* version should be monotoniously increasing so our RPC mutation should be applied safely */
      val newVersion = invertedEdge.map(e => e.getVersion() + incrementVersion).getOrElse(requestTs)
      val maxTs = prevPropsWithTs.map(_._2.ts).max
      val newTs = if (maxTs > requestTs) maxTs else requestTs
      val propsWithTs = prevPropsWithTs ++
        Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs))

      val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)

      //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
      //      logger.error(s"$propsWithTs")
      val newEdge = requestEdge.copyEdgeWithState(propsWithTs)

      (newEdge, edgeMutate)
    }
  }