protected def commitUpdate()

in s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala [114:232]


  protected def commitUpdate(edges: Seq[S2EdgeLike],
                             statusCode: Byte,
                             fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = {
    //    Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
    assert(edges.nonEmpty)
    //    assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)

    statusCode match {
      case 0 =>
        fetchedSnapshotEdgeOpt match {
          case None =>
            /*
             * no one has never mutated this SN.
             * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
             * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
             * lock = (squashedEdge, pendingE)
             * releaseLock = (edgeMutate.newSnapshotEdge, None)
             */
            val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)

            assert(edgeMutate.newSnapshotEdge.isDefined)

            val lockTs = Option(System.currentTimeMillis())
            val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(squashedEdge.ts + 1)
            val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
            val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
              pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)

            commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)

          case Some(snapshotEdge) =>
            snapshotEdge.getPendingEdgeOpt() match {
              case None =>
                /*
                 * others finished commit on this SN. but there is no contention.
                 * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
                 * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
                 * lock = (snapshotEdge, pendingE)
                 * releaseLock = (edgeMutate.newSnapshotEdge, None)
                 */
                val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
                if (edgeMutate.newSnapshotEdge.isEmpty) {
                  logger.debug(s"drop this requests: \n${edges.map(_.toLogString).mkString("\n")}")
                  Future.successful(true)
                } else {
                  val lockTs = Option(System.currentTimeMillis())
                  val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1)
                  val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
                  val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
                    pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
                  commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
                }
              case Some(pendingEdge) =>
                val isLockExpired = pendingEdge.getLockTs().get + LockExpireDuration < System.currentTimeMillis()
                if (isLockExpired) {
                  /*
                   * if pendingEdge.ts == snapshotEdge.ts =>
                   *    (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
                   * else =>
                   *    (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge))
                   * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1)
                   * lock = (snapshotEdge, pendingE)
                   * releaseLock = (edgeMutate.newSnapshotEdge, None)
                   */
                  logger.debug(s"${pendingEdge.toLogString} has been expired.")
                  val (squashedEdge, edgeMutate) =
                    if (pendingEdge.ts == snapshotEdge.ts) S2Edge.buildOperation(None, pendingEdge +: edges)
                    else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges)

                  val lockTs = Option(System.currentTimeMillis())
                  val newPendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1)
                  val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge))
                  val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
                    pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)

                  commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
                } else {
                  /*
                   * others finished commit on this SN and there is currently contention.
                   * this can't be proceed so retry from re-fetch.
                   * throw EX
                   */
                  val (squashedEdge, _) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
                  Future.failed(new PartialFailureException(squashedEdge, 0, s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]"))
                }
            }

        }
      case _ =>

        /*
         * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
         */

        /*
         * this succeed to lock this SN. keep doing on commit process.
         * if SN.isEmpty =>
         * no one never succed to commit on this SN.
         * this is first mutation try on this SN.
         * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
         * else =>
         * assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when self retrying.
         * there has been success commit on this SN.
         * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
         * releaseLock = (edgeMutate.newSnapshotEdge, None)
         */
        val _edges =
          if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.getPendingEdgeOpt().isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
          else edges
        val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
        val newVersion = fetchedSnapshotEdgeOpt.map(_.getVersion()).getOrElse(squashedEdge.ts) + 2
        val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
          case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
          case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
        }
        // lockSnapshotEdge will be ignored.
        commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
    }
  }