private def handleGetShuffleIdForApp()

in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [868:965]


  private def handleGetShuffleIdForApp(
      context: RpcCallContext,
      appShuffleId: Int,
      appShuffleIdentifier: String,
      isWriter: Boolean,
      isBarrierStage: Boolean): Unit = {
    val shuffleIds =
      if (isWriter) {
        shuffleIdMapping.computeIfAbsent(
          appShuffleId,
          new function.Function[
            Int,
            scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]() {
            override def apply(id: Int)
                : scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)] = {
              val newShuffleId = shuffleIdGenerator.getAndIncrement()
              logInfo(s"generate new shuffleId $newShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
              scala.collection.mutable.LinkedHashMap(appShuffleIdentifier -> (newShuffleId, true))
            }
          })
      } else {
        shuffleIdMapping.get(appShuffleId)
      }

    if (shuffleIds == null) {
      logWarning(s"unknown appShuffleId $appShuffleId, maybe no shuffle data for this shuffle")
      val pbGetShuffleIdResponse =
        PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).build()
      context.reply(pbGetShuffleIdResponse)
      return
    }

    def areAllMapTasksEnd(shuffleId: Int): Boolean = {
      ClientUtils.areAllMapperAttemptsFinished(commitManager.getMapperAttempts(shuffleId))
    }

    shuffleIds.synchronized {
      if (isWriter) {
        shuffleIds.get(appShuffleIdentifier) match {
          case Some((shuffleId, _)) =>
            val pbGetShuffleIdResponse =
              PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
            context.reply(pbGetShuffleIdResponse)
          case None =>
            Option(appShuffleDeterminateMap.get(appShuffleId)).map { determinate =>
              val candidateShuffle =
                // For barrier stages, all tasks are re-executed when it is re-run : similar to indeterminate stage.
                // So if a barrier stage is getting reexecuted, previous stage/attempt needs to
                // be cleaned up as it is entirely unusuable
                if (determinate && !isBarrierStage && !isCelebornSkewShuffleOrChildShuffle(
                    appShuffleId))
                  shuffleIds.values.toSeq.reverse.find(e => e._2 == true)
                else
                  None

              val shuffleId: Integer =
                if (determinate && candidateShuffle.isDefined) {
                  val id = candidateShuffle.get._1
                  logInfo(s"reuse existing shuffleId $id for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
                  id
                } else {
                  if (isBarrierStage) {
                    // unregister previous shuffle(s) which are still valid
                    val mapUpdates = shuffleIds.filter(_._2._2).map { kv =>
                      unregisterShuffle(kv._2._1)
                      kv._1 -> (kv._2._1, false)
                    }
                    shuffleIds ++= mapUpdates
                  }
                  val newShuffleId = shuffleIdGenerator.getAndIncrement()
                  logInfo(s"generate new shuffleId $newShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
                  shuffleIds.put(appShuffleIdentifier, (newShuffleId, true))
                  newShuffleId
                }
              val pbGetShuffleIdResponse =
                PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
              context.reply(pbGetShuffleIdResponse)
            }.orElse(
              throw new UnsupportedOperationException(
                s"unexpected! unknown appShuffleId $appShuffleId when checking shuffle deterministic level"))
        }
      } else {
        shuffleIds.values.filter(v => v._2).map(v => v._1).toSeq.reverse.find(
          areAllMapTasksEnd) match {
          case Some(shuffleId) =>
            val pbGetShuffleIdResponse = {
              logDebug(
                s"get shuffleId $shuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter")
              PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
            }
            context.reply(pbGetShuffleIdResponse)
          case None =>
            throw new UnsupportedOperationException(
              s"unexpected! there is no finished map stage associated with appShuffleId $appShuffleId")
        }
      }
    }
  }