in s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala [54:112]
def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = {
if (tryNum >= MaxRetryNum) {
edges.foreach { edge =>
logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
}
Future.successful(false)
} else {
val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt)
future.onSuccess {
case success =>
logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
}
future recoverWith {
case FetchTimeoutException(retryEdge) =>
logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
/* fetch failed. re-fetch should be done */
optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
case PartialFailureException(retryEdge, failedStatusCode, faileReason) =>
val status = failedStatusCode match {
case 0 => "AcquireLock failed."
case 1 => "Mutation failed."
case 2 => "Increment failed."
case 3 => "ReleaseLock failed."
case 4 => "Unknown"
}
logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
/* retry logic */
val promise = Promise[Boolean]
val backOff = exponentialBackOff(tryNum)
scheduledThreadPool.schedule(new Runnable {
override def run(): Unit = {
val future = if (failedStatusCode == 0) {
// acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
/* fetch failed. re-fetch should be done */
optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
} else {
// partial failure occur while self locked and mutating.
// assert(fetchedSnapshotEdgeOpt.nonEmpty)
retry(tryNum + 1)(edges, failedStatusCode, fetchedSnapshotEdgeOpt)
}
promise.completeWith(future)
}
}, backOff, TimeUnit.MILLISECONDS)
promise.future
case ex: Exception =>
logger.error("Unknown exception", ex)
Future.successful(false)
}
}
}