in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [979:1061]
def handleMapPartitionPushData(pushData: PushData, callback: RpcResponseCallback): Unit = {
val shuffleKey = pushData.shuffleKey
val mode = PartitionLocation.getMode(pushData.mode)
val body = pushData.body.asInstanceOf[NettyManagedBuffer].getBuf
val isPrimary = mode == PartitionLocation.Mode.PRIMARY
val key = s"${pushData.requestId}"
if (isPrimary) {
workerSource.startTimer(WorkerSource.PRIMARY_PUSH_DATA_TIME, key)
} else {
workerSource.startTimer(WorkerSource.REPLICA_PUSH_DATA_TIME, key)
}
// find FileWriter responsible for the data
val location =
if (isPrimary) {
partitionLocationInfo.getPrimaryLocation(shuffleKey, pushData.partitionUniqueId)
} else {
partitionLocationInfo.getReplicaLocation(shuffleKey, pushData.partitionUniqueId)
}
val wrappedCallback =
new WrappedRpcResponseCallback(
pushData.`type`(),
isPrimary,
pushData.requestId,
null,
location,
if (isPrimary) WorkerSource.PRIMARY_PUSH_DATA_TIME else WorkerSource.REPLICA_PUSH_DATA_TIME,
callback)
if (locationIsNull(
pushData.`type`(),
shuffleKey,
pushData.partitionUniqueId,
location,
callback)) return
val fileWriter =
getFileWriterAndCheck(pushData.`type`(), location, isPrimary, callback) match {
case (true, _) => return
case (false, f: PartitionDataWriter) => f
}
// for mappartition we will not check whether disk full or split partition
fileWriter.incrementPendingWrites()
if (fileWriter.isClosed) {
val fileInfo = fileWriter.getCurrentFileInfo
logWarning(
s"[handleMapPartitionPushData] FileWriter is already closed! File path ${fileInfo.getFilePath} " +
s"length ${fileInfo.getFileLength}")
callback.onFailure(new CelebornIOException("File already closed!"))
fileWriter.decrementPendingWrites()
return
}
val writePromise = Promise[Array[StatusCode]]()
writeLocalData(Seq(fileWriter), body, shuffleKey, isPrimary, None, writePromise)
// for primary, send data to replica
if (location.hasPeer && isPrimary) {
// to do
Try(Await.result(writePromise.future, Duration.Inf)) match {
case Success(result) =>
if (result(0) != StatusCode.SUCCESS) {
wrappedCallback.onFailure(new CelebornIOException("Write data failed!"))
} else {
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
case Failure(e) => wrappedCallback.onFailure(e)
}
} else {
Try(Await.result(writePromise.future, Duration.Inf)) match {
case Success(result) =>
if (result(0) != StatusCode.SUCCESS) {
wrappedCallback.onFailure(new CelebornIOException("Write data failed!"))
} else {
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
case Failure(e) => wrappedCallback.onFailure(e)
}
}
}