in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [747:829]
def handleMapPartitionPushData(pushData: PushData, callback: RpcResponseCallback): Unit = {
val shuffleKey = pushData.shuffleKey
val mode = PartitionLocation.getMode(pushData.mode)
val body = pushData.body.asInstanceOf[NettyManagedBuffer].getBuf
val isPrimary = mode == PartitionLocation.Mode.PRIMARY
val key = s"${pushData.requestId}"
if (isPrimary) {
workerSource.startTimer(WorkerSource.PRIMARY_PUSH_DATA_TIME, key)
} else {
workerSource.startTimer(WorkerSource.REPLICA_PUSH_DATA_TIME, key)
}
// find FileWriter responsible for the data
val location =
if (isPrimary) {
partitionLocationInfo.getPrimaryLocation(shuffleKey, pushData.partitionUniqueId)
} else {
partitionLocationInfo.getReplicaLocation(shuffleKey, pushData.partitionUniqueId)
}
val wrappedCallback =
new WrappedRpcResponseCallback(
pushData.`type`(),
isPrimary,
pushData.requestId,
null,
location,
if (isPrimary) WorkerSource.PRIMARY_PUSH_DATA_TIME else WorkerSource.REPLICA_PUSH_DATA_TIME,
callback)
if (locationIsNull(
pushData.`type`(),
shuffleKey,
pushData.partitionUniqueId,
null,
location,
callback,
wrappedCallback)) return
// During worker shutdown, worker will return HARD_SPLIT for all existed partition.
// This should before return exception to make current push request revive and retry.
if (shutdown.get()) {
logInfo(s"Push data return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}
val fileWriter =
getFileWriterAndCheck(pushData.`type`(), location, isPrimary, callback) match {
case (true, _) => return
case (false, f: FileWriter) => f
}
// for mappartition we will not check whether disk full or split partition
fileWriter.incrementPendingWrites()
// for primary, send data to replica
if (location.hasPeer && isPrimary) {
// to do
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
} else {
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
try {
fileWriter.write(body)
} catch {
case e: AlreadyClosedException =>
fileWriter.decrementPendingWrites()
val (mapId, attemptId) = getMapAttempt(body)
val endedAttempt =
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
shuffleMapperAttempts.get(shuffleKey).get(mapId)
} else -1
// TODO just info log for ended attempt
logWarning(s"Append data failed for task(shuffle $shuffleKey, map $mapId, attempt" +
s" $attemptId), caused by AlreadyClosedException, endedAttempt $endedAttempt, error message: ${e.getMessage}")
case e: Exception =>
logError("Exception encountered when write.", e)
}
}