in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala [386:659]
def handlePushMergedData(
pushMergedData: PushMergedData,
callback: RpcResponseCallback): Unit = {
val shuffleKey = pushMergedData.shuffleKey
val mode = PartitionLocation.getMode(pushMergedData.mode)
val batchOffsets = pushMergedData.batchOffsets
val body = pushMergedData.body.asInstanceOf[NettyManagedBuffer].getBuf
val isPrimary = mode == PartitionLocation.Mode.PRIMARY
val key = s"${pushMergedData.requestId}"
val callbackWithTimer =
if (isPrimary) {
new RpcResponseCallbackWithTimer(
workerSource,
WorkerSource.PRIMARY_PUSH_DATA_TIME,
key,
callback)
} else {
new RpcResponseCallbackWithTimer(
workerSource,
WorkerSource.REPLICA_PUSH_DATA_TIME,
key,
callback)
}
// For test
if (isPrimary && testPushPrimaryDataTimeout &&
!PushDataHandler.pushPrimaryMergeDataTimeoutTested.getAndSet(true)) {
return
}
if (!isPrimary && testPushReplicaDataTimeout &&
!PushDataHandler.pushReplicaMergeDataTimeoutTested.getAndSet(true)) {
return
}
val partitionIdToLocations =
if (isPrimary) {
partitionLocationInfo.getPrimaryLocations(shuffleKey, pushMergedData.partitionUniqueIds)
} else {
partitionLocationInfo.getReplicaLocations(shuffleKey, pushMergedData.partitionUniqueIds)
}
// Fetch real batchId from body will add more cost and no meaning for replicate.
val doReplicate =
partitionIdToLocations.head._2 != null && partitionIdToLocations.head._2.hasPeer && isPrimary
// find FileWriters responsible for the data
var index = 0
while (index < partitionIdToLocations.length) {
val (id, loc) = partitionIdToLocations(index)
if (loc == null) {
val (mapId, attemptId) = getMapAttempt(body)
// MapperAttempts for a shuffle exists after any CommitFiles request succeeds.
// A shuffle can trigger multiple CommitFiles requests, for reasons like: HARD_SPLIT happens, StageEnd.
// If MapperAttempts but the value is -1 for the mapId(-1 means the map has not yet finished),
// it's probably because commitFiles for HARD_SPLIT happens.
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
if (-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
logInfo(s"Receive push merged data from speculative " +
s"task(shuffle $shuffleKey, map $mapId, attempt $attemptId), " +
s"but this mapper has already been ended.")
callbackWithTimer.onSuccess(
ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
} else {
logInfo(s"[Case1] Receive push merged data for committed hard split partition of " +
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
callbackWithTimer.onSuccess(
ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
}
} else {
if (storageManager.shuffleKeySet().contains(shuffleKey)) {
// If there is no shuffle key in shuffleMapperAttempts but there is shuffle key
// in StorageManager. This partition should be HARD_SPLIT partition and
// after worker restart, some tasks still push data to this HARD_SPLIT partition.
logInfo(s"[Case2] Receive push merged data for committed hard split partition of " +
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
callbackWithTimer.onSuccess(
ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
} else {
logWarning(s"While handling PushMergedData, Partition location wasn't found for " +
s"task(shuffle $shuffleKey, map $mapId, attempt $attemptId, uniqueId $id).")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND))
}
}
return
}
index += 1
}
// During worker shutdown, worker will return HARD_SPLIT for all existed partition.
// This should before return exception to make current push data can revive and retry.
if (shutdown.get()) {
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}
val (fileWriters, exceptionFileWriterIndexOpt) = getFileWriters(partitionIdToLocations)
if (exceptionFileWriterIndexOpt.isDefined) {
val fileWriterWithException = fileWriters(exceptionFileWriterIndexOpt.get)
val cause =
if (isPrimary) {
StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY
} else {
StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA
}
logError(
s"While handling PushMergedData, throw $cause, fileWriter $fileWriterWithException has exception.",
fileWriterWithException.getException)
workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
callbackWithTimer.onFailure(new CelebornIOException(cause))
return
}
fileWriters.foreach(_.incrementPendingWrites())
// for primary, send data to replica
if (doReplicate) {
pushMergedData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
val location = partitionIdToLocations.head._2
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
peer.getRpcPort,
peer.getPushPort,
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushMergedData.body().release()
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushMergedData replication failed caused by unavailable peer for partitionLocation: $location")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
return
}
// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
// Only primary data enable replication will push data to replica
if (response.remaining() > 0) {
val resp = ByteBuffer.allocate(response.remaining())
resp.put(response)
resp.flip()
callbackWithTimer.onSuccess(resp)
} else {
Option(CongestionController.instance()) match {
case Some(congestionController) if fileWriters.nonEmpty =>
if (congestionController.isUserCongested(
fileWriters.head.getFileInfo.getUserIdentifier)) {
// Check whether primary congest the data though the replicas doesn't congest
// it(the response is empty)
callbackWithTimer.onSuccess(
ByteBuffer.wrap(
Array[Byte](StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue)))
} else {
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
case None =>
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
}
}
override def onFailure(e: Throwable): Unit = {
logError(s"PushMergedData replicate failed for partitionLocation: $location", e)
// 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if (e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
callbackWithTimer.onFailure(e)
} else if (e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
callbackWithTimer.onFailure(e)
} else {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
}
}
}
try {
val client = getClient(peer.getHost, peer.getReplicatePort, location.getId)
val newPushMergedData = new PushMergedData(
PartitionLocation.Mode.REPLICA.mode(),
shuffleKey,
pushMergedData.partitionUniqueIds,
batchOffsets,
pushMergedData.body)
client.pushMergedData(
newPushMergedData,
shufflePushDataTimeout.get(shuffleKey),
wrappedCallback)
} catch {
case e: Exception =>
pushMergedData.body().release()
unavailablePeers.put(peerWorker, System.currentTimeMillis())
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushMergedData replication failed during connecting peer for partitionLocation: $location",
e)
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
}
}
})
} else {
// The codes here could be executed if
// 1. the client doesn't enable push data to the replica, the primary worker could hit here
// 2. the client enables push data to the replica, and the replica worker could hit here
Option(CongestionController.instance()) match {
case Some(congestionController) if fileWriters.nonEmpty =>
if (congestionController.isUserCongested(
fileWriters.head.getFileInfo.getUserIdentifier)) {
if (isPrimary) {
callbackWithTimer.onSuccess(
ByteBuffer.wrap(
Array[Byte](StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue)))
} else {
callbackWithTimer.onSuccess(
ByteBuffer.wrap(
Array[Byte](StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue)))
}
} else {
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
case None =>
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte]()))
}
}
index = 0
var fileWriter: FileWriter = null
var alreadyClosed = false
while (index < fileWriters.length) {
fileWriter = fileWriters(index)
val offset = body.readerIndex() + batchOffsets(index)
val length =
if (index == fileWriters.length - 1) {
body.readableBytes() - batchOffsets(index)
} else {
batchOffsets(index + 1) - batchOffsets(index)
}
val batchBody = body.slice(offset, length)
try {
if (!alreadyClosed) {
fileWriter.write(batchBody)
} else {
fileWriter.decrementPendingWrites()
}
} catch {
case e: AlreadyClosedException =>
fileWriter.decrementPendingWrites()
alreadyClosed = true
val (mapId, attemptId) = getMapAttempt(body)
val endedAttempt =
if (shuffleMapperAttempts.containsKey(shuffleKey)) {
shuffleMapperAttempts.get(shuffleKey).get(mapId)
} else -1
// TODO just info log for ended attempt
logWarning(s"Append data failed for task(shuffle $shuffleKey, map $mapId, attempt" +
s" $attemptId), caused by AlreadyClosedException, endedAttempt $endedAttempt, error message: ${e.getMessage}")
case e: Exception =>
logError("Exception encountered when write.", e)
}
index += 1
}
}