in client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala [868:965]
private def handleGetShuffleIdForApp(
context: RpcCallContext,
appShuffleId: Int,
appShuffleIdentifier: String,
isWriter: Boolean,
isBarrierStage: Boolean): Unit = {
val shuffleIds =
if (isWriter) {
shuffleIdMapping.computeIfAbsent(
appShuffleId,
new function.Function[
Int,
scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]() {
override def apply(id: Int)
: scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)] = {
val newShuffleId = shuffleIdGenerator.getAndIncrement()
logInfo(s"generate new shuffleId $newShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
scala.collection.mutable.LinkedHashMap(appShuffleIdentifier -> (newShuffleId, true))
}
})
} else {
shuffleIdMapping.get(appShuffleId)
}
if (shuffleIds == null) {
logWarning(s"unknown appShuffleId $appShuffleId, maybe no shuffle data for this shuffle")
val pbGetShuffleIdResponse =
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).build()
context.reply(pbGetShuffleIdResponse)
return
}
def areAllMapTasksEnd(shuffleId: Int): Boolean = {
ClientUtils.areAllMapperAttemptsFinished(commitManager.getMapperAttempts(shuffleId))
}
shuffleIds.synchronized {
if (isWriter) {
shuffleIds.get(appShuffleIdentifier) match {
case Some((shuffleId, _)) =>
val pbGetShuffleIdResponse =
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
context.reply(pbGetShuffleIdResponse)
case None =>
Option(appShuffleDeterminateMap.get(appShuffleId)).map { determinate =>
val candidateShuffle =
// For barrier stages, all tasks are re-executed when it is re-run : similar to indeterminate stage.
// So if a barrier stage is getting reexecuted, previous stage/attempt needs to
// be cleaned up as it is entirely unusuable
if (determinate && !isBarrierStage && !isCelebornSkewShuffleOrChildShuffle(
appShuffleId))
shuffleIds.values.toSeq.reverse.find(e => e._2 == true)
else
None
val shuffleId: Integer =
if (determinate && candidateShuffle.isDefined) {
val id = candidateShuffle.get._1
logInfo(s"reuse existing shuffleId $id for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
id
} else {
if (isBarrierStage) {
// unregister previous shuffle(s) which are still valid
val mapUpdates = shuffleIds.filter(_._2._2).map { kv =>
unregisterShuffle(kv._2._1)
kv._1 -> (kv._2._1, false)
}
shuffleIds ++= mapUpdates
}
val newShuffleId = shuffleIdGenerator.getAndIncrement()
logInfo(s"generate new shuffleId $newShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
shuffleIds.put(appShuffleIdentifier, (newShuffleId, true))
newShuffleId
}
val pbGetShuffleIdResponse =
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
context.reply(pbGetShuffleIdResponse)
}.orElse(
throw new UnsupportedOperationException(
s"unexpected! unknown appShuffleId $appShuffleId when checking shuffle deterministic level"))
}
} else {
shuffleIds.values.filter(v => v._2).map(v => v._1).toSeq.reverse.find(
areAllMapTasksEnd) match {
case Some(shuffleId) =>
val pbGetShuffleIdResponse = {
logDebug(
s"get shuffleId $shuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter")
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
}
context.reply(pbGetShuffleIdResponse)
case None =>
throw new UnsupportedOperationException(
s"unexpected! there is no finished map stage associated with appShuffleId $appShuffleId")
}
}
}
}