def retry()

in s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala [54:112]


  def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = {
    if (tryNum >= MaxRetryNum) {
      edges.foreach { edge =>
        logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
      }

      Future.successful(false)
    } else {
      val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt)
      future.onSuccess {
        case success =>
          logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
      }
      future recoverWith {
        case FetchTimeoutException(retryEdge) =>
          logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
          /* fetch failed. re-fetch should be done */
          optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
            retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
          }

        case PartialFailureException(retryEdge, failedStatusCode, faileReason) =>
          val status = failedStatusCode match {
            case 0 => "AcquireLock failed."
            case 1 => "Mutation failed."
            case 2 => "Increment failed."
            case 3 => "ReleaseLock failed."
            case 4 => "Unknown"
          }
          logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")

          /* retry logic */
          val promise = Promise[Boolean]
          val backOff = exponentialBackOff(tryNum)
          scheduledThreadPool.schedule(new Runnable {
            override def run(): Unit = {
              val future = if (failedStatusCode == 0) {
                // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
                /* fetch failed. re-fetch should be done */
                optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
                  retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
                }
              } else {
                // partial failure occur while self locked and mutating.
                //            assert(fetchedSnapshotEdgeOpt.nonEmpty)
                retry(tryNum + 1)(edges, failedStatusCode, fetchedSnapshotEdgeOpt)
              }
              promise.completeWith(future)
            }

          }, backOff, TimeUnit.MILLISECONDS)
          promise.future

        case ex: Exception =>
          logger.error("Unknown exception", ex)
          Future.successful(false)
      }
    }
  }