in s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala [114:232]
protected def commitUpdate(edges: Seq[S2EdgeLike],
statusCode: Byte,
fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = {
// Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
assert(edges.nonEmpty)
// assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)
statusCode match {
case 0 =>
fetchedSnapshotEdgeOpt match {
case None =>
/*
* no one has never mutated this SN.
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
* lock = (squashedEdge, pendingE)
* releaseLock = (edgeMutate.newSnapshotEdge, None)
*/
val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
assert(edgeMutate.newSnapshotEdge.isDefined)
val lockTs = Option(System.currentTimeMillis())
val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(squashedEdge.ts + 1)
val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
case Some(snapshotEdge) =>
snapshotEdge.getPendingEdgeOpt() match {
case None =>
/*
* others finished commit on this SN. but there is no contention.
* (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
* lock = (snapshotEdge, pendingE)
* releaseLock = (edgeMutate.newSnapshotEdge, None)
*/
val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
if (edgeMutate.newSnapshotEdge.isEmpty) {
logger.debug(s"drop this requests: \n${edges.map(_.toLogString).mkString("\n")}")
Future.successful(true)
} else {
val lockTs = Option(System.currentTimeMillis())
val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1)
val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
}
case Some(pendingEdge) =>
val isLockExpired = pendingEdge.getLockTs().get + LockExpireDuration < System.currentTimeMillis()
if (isLockExpired) {
/*
* if pendingEdge.ts == snapshotEdge.ts =>
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
* else =>
* (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge))
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1)
* lock = (snapshotEdge, pendingE)
* releaseLock = (edgeMutate.newSnapshotEdge, None)
*/
logger.debug(s"${pendingEdge.toLogString} has been expired.")
val (squashedEdge, edgeMutate) =
if (pendingEdge.ts == snapshotEdge.ts) S2Edge.buildOperation(None, pendingEdge +: edges)
else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges)
val lockTs = Option(System.currentTimeMillis())
val newPendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1)
val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge))
val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
} else {
/*
* others finished commit on this SN and there is currently contention.
* this can't be proceed so retry from re-fetch.
* throw EX
*/
val (squashedEdge, _) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
Future.failed(new PartialFailureException(squashedEdge, 0, s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]"))
}
}
}
case _ =>
/*
* statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
*/
/*
* this succeed to lock this SN. keep doing on commit process.
* if SN.isEmpty =>
* no one never succed to commit on this SN.
* this is first mutation try on this SN.
* (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
* else =>
* assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when self retrying.
* there has been success commit on this SN.
* (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
* releaseLock = (edgeMutate.newSnapshotEdge, None)
*/
val _edges =
if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.getPendingEdgeOpt().isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
else edges
val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
val newVersion = fetchedSnapshotEdgeOpt.map(_.getVersion()).getOrElse(squashedEdge.ts) + 2
val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
}
// lockSnapshotEdge will be ignored.
commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
}
}