in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [130:384]
def handlePushData(pushData: PushData, callback: RpcResponseCallback): Unit = {
val shuffleKey = pushData.shuffleKey
val mode = PartitionLocation.getMode(pushData.mode)
val body = pushData.body.asInstanceOf[NettyManagedBuffer].getBuf
val isPrimary = mode == PartitionLocation.Mode.PRIMARY
// For test
if (isPrimary && testPushPrimaryDataTimeout &&
!PushDataHandler.pushPrimaryDataTimeoutTested.getAndSet(true)) {
return
}
if (!isPrimary && testPushReplicaDataTimeout &&
!PushDataHandler.pushReplicaDataTimeoutTested.getAndSet(true)) {
return
}
val key = s"${pushData.requestId}"
val callbackWithTimer =
if (isPrimary) {
new RpcResponseCallbackWithTimer(
workerSource,
WorkerSource.PRIMARY_PUSH_DATA_TIME,
key,
callback)
} else {
new RpcResponseCallbackWithTimer(
workerSource,
WorkerSource.REPLICA_PUSH_DATA_TIME,
key,
callback)
}
// find FileWriter responsible for the data
val location =
if (isPrimary) {
partitionLocationInfo.getPrimaryLocation(shuffleKey, pushData.partitionUniqueId)
} else {
partitionLocationInfo.getReplicaLocation(shuffleKey, pushData.partitionUniqueId)
}
// Fetch real batchId from body will add more cost and no meaning for replicate.
val doReplicate = location != null && location.hasPeer && isPrimary
val softSplit = new AtomicBoolean(false)
if (location == null) {
val (mapId, attemptId) = getMapAttempt(body)
// MapperAttempts for a shuffle exists after any CommitFiles request succeeds.
// A shuffle can trigger multiple CommitFiles requests, for reasons like: HARD_SPLIT happens, StageEnd.
// If MapperAttempts but the value is -1 for the mapId(-1 means the map has not yet finished),
// it's probably because commitFiles for HARD_SPLIT happens.
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
// partition data has already been committed
logInfo(
s"[Case1] Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
s" attempt $attemptId), but this mapper has already been ended.")
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
} else {
logInfo(
s"Receive push data for committed hard split partition of (shuffle $shuffleKey, " +
s"map $mapId attempt $attemptId)")
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
}
} else {
if (storageManager.shuffleKeySet().contains(shuffleKey)) {
// If there is no shuffle key in shuffleMapperAttempts but there is shuffle key
// in StorageManager. This partition should be HARD_SPLIT partition and
// after worker restart, some tasks still push data to this HARD_SPLIT partition.
logInfo(s"[Case2] Receive push data for committed hard split partition of " +
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
} else {
logWarning(s"While handle PushData, Partition location wasn't found for " +
s"task(shuffle $shuffleKey, map $mapId, attempt $attemptId, uniqueId ${pushData.partitionUniqueId}).")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND))
}
}
return
}
// During worker shutdown, worker will return HARD_SPLIT for all existed partition.
// This should before return exception to make current push data can revive and retry.
if (shutdown.get()) {
logInfo(s"Push data return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}
val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
val exception = fileWriter.getException
if (exception != null) {
val cause =
if (isPrimary) {
StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY
} else {
StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA
}
logError(
s"While handling PushData, throw $cause, fileWriter $fileWriter has exception.",
exception)
workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
callbackWithTimer.onFailure(new CelebornIOException(cause))
return
}
if (checkDiskFullAndSplit(fileWriter, isPrimary, softSplit, callbackWithTimer)) return
fileWriter.incrementPendingWrites()
// for primary, send data to replica
if (doReplicate) {
pushData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
peer.getRpcPort,
peer.getPushPort,
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushData.body().release()
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushData replication failed caused by unavailable peer for partitionLocation: $location")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
return
}
// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
if (response.remaining() > 0) {
val resp = ByteBuffer.allocate(response.remaining())
resp.put(response)
resp.flip()
callbackWithTimer.onSuccess(resp)
} else if (softSplit.get()) {
// TODO Currently if the worker is in soft split status, given the guess that the client
// will fast stop pushing data to the worker, we won't return congest status. But
// in the long term, especially if this issue could frequently happen, we may need to return
// congest&softSplit status together
callbackWithTimer.onSuccess(
ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
} else {
Option(CongestionController.instance()) match {
case Some(congestionController) =>
if (congestionController.isUserCongested(
fileWriter.getFileInfo.getUserIdentifier)) {
// Check whether primary congest the data though the replicas doesn't congest
// it(the response is empty)
callbackWithTimer.onSuccess(
ByteBuffer.wrap(
Array[Byte](StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue)))
} else {
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
case None =>
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
}
}
override def onFailure(e: Throwable): Unit = {
logError(s"PushData replication failed for partitionLocation: $location", e)
// 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if (e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
callbackWithTimer.onFailure(e)
} else if (e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
callbackWithTimer.onFailure(e)
} else {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
}
}
}
try {
val client = getClient(peer.getHost, peer.getReplicatePort, location.getId)
val newPushData = new PushData(
PartitionLocation.Mode.REPLICA.mode(),
shuffleKey,
pushData.partitionUniqueId,
pushData.body)
client.pushData(newPushData, shufflePushDataTimeout.get(shuffleKey), wrappedCallback)
} catch {
case e: Exception =>
pushData.body().release()
unavailablePeers.put(peerWorker, System.currentTimeMillis())
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushData replication failed during connecting peer for partitionLocation: $location",
e)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
}
}
})
} else {
// The codes here could be executed if
// 1. the client doesn't enable push data to the replica, the primary worker could hit here
// 2. the client enables push data to the replica, and the replica worker could hit here
// TODO Currently if the worker is in soft split status, given the guess that the client
// will fast stop pushing data to the worker, we won't return congest status. But
// in the long term, especially if this issue could frequently happen, we may need to return
// congest&softSplit status together
if (softSplit.get()) {
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
} else {
Option(CongestionController.instance()) match {
case Some(congestionController) =>
if (congestionController.isUserCongested(fileWriter.getFileInfo.getUserIdentifier)) {
if (isPrimary) {
callbackWithTimer.onSuccess(
ByteBuffer.wrap(
Array[Byte](StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue)))
} else {
callbackWithTimer.onSuccess(
ByteBuffer.wrap(
Array[Byte](StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue)))
}
} else {
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
case None =>
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
}
}
try {
fileWriter.write(body)
} catch {
case e: AlreadyClosedException =>
fileWriter.decrementPendingWrites()
val (mapId, attemptId) = getMapAttempt(body)
val endedAttempt =
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
shuffleMapperAttempts.get(shuffleKey).get(mapId)
} else -1
// TODO just info log for ended attempt
logWarning(s"Append data failed for task(shuffle $shuffleKey, map $mapId, attempt" +
s" $attemptId), caused by AlreadyClosedException, endedAttempt $endedAttempt, error message: ${e.getMessage}")
case e: Exception =>
logError("Exception encountered when write.", e)
}
}