in client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala [247:364]
def commitFiles(
applicationId: String,
shuffleId: Int,
shuffleCommittedInfo: ShuffleCommittedInfo,
worker: WorkerInfo,
primaryIds: util.List[String],
replicaIds: util.List[String],
commitFilesFailedWorkers: ShuffleFailedWorkers): Unit = {
if (CollectionUtils.isEmpty(primaryIds) && CollectionUtils.isEmpty(replicaIds)) {
return
}
val res =
if (!testRetryCommitFiles) {
val commitFiles = CommitFiles(
applicationId,
shuffleId,
primaryIds,
replicaIds,
getMapperAttempts(shuffleId),
commitEpoch.incrementAndGet())
val res =
if (conf.clientCommitFilesIgnoreExcludedWorkers &&
workerStatusTracker.excludedWorkers.containsKey(worker)) {
CommitFilesResponse(
StatusCode.WORKER_EXCLUDED,
List.empty.asJava,
List.empty.asJava,
primaryIds,
replicaIds)
} else {
requestCommitFilesWithRetry(worker.endpoint, commitFiles)
}
res.status match {
case StatusCode.SUCCESS => // do nothing
case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED | StatusCode.WORKER_EXCLUDED =>
logInfo(s"Request $commitFiles return ${res.status} for " +
s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
if (res.status != StatusCode.WORKER_EXCLUDED) {
commitFilesFailedWorkers.put(worker, (res.status, System.currentTimeMillis()))
}
case _ =>
logError(s"Should never reach here! commit files response status ${res.status}")
}
res
} else {
// for test
val commitFiles1 = CommitFiles(
applicationId,
shuffleId,
primaryIds.subList(0, primaryIds.size() / 2),
replicaIds.subList(0, replicaIds.size() / 2),
getMapperAttempts(shuffleId),
commitEpoch.incrementAndGet())
val res1 = requestCommitFilesWithRetry(worker.endpoint, commitFiles1)
val commitFiles = CommitFiles(
applicationId,
shuffleId,
primaryIds.subList(primaryIds.size() / 2, primaryIds.size()),
replicaIds.subList(replicaIds.size() / 2, replicaIds.size()),
getMapperAttempts(shuffleId),
commitEpoch.incrementAndGet())
val res2 = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
res1.committedPrimaryStorageInfos.putAll(res2.committedPrimaryStorageInfos)
res1.committedReplicaStorageInfos.putAll(res2.committedReplicaStorageInfos)
res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
CommitFilesResponse(
status = if (res1.status == StatusCode.SUCCESS) res2.status else res1.status,
(res1.committedPrimaryIds.asScala ++ res2.committedPrimaryIds.asScala).toList.asJava,
(res1.committedReplicaIds.asScala ++ res1.committedReplicaIds.asScala).toList.asJava,
(res1.failedPrimaryIds.asScala ++ res1.failedPrimaryIds.asScala).toList.asJava,
(res1.failedReplicaIds.asScala ++ res2.failedReplicaIds.asScala).toList.asJava,
res1.committedPrimaryStorageInfos,
res1.committedReplicaStorageInfos,
res1.committedMapIdBitMap,
res1.totalWritten + res2.totalWritten,
res1.fileCount + res2.fileCount)
}
shuffleCommittedInfo.synchronized {
// record committed partitionIds
res.committedPrimaryIds.asScala.foreach({
case commitPrimaryId =>
val partitionUniqueIdList = shuffleCommittedInfo.committedPrimaryIds.computeIfAbsent(
Utils.splitPartitionLocationUniqueId(commitPrimaryId)._1,
(k: Int) => new util.ArrayList[String]())
partitionUniqueIdList.add(commitPrimaryId)
})
res.committedReplicaIds.asScala.foreach({
case commitReplicaId =>
val partitionUniqueIdList = shuffleCommittedInfo.committedReplicaIds.computeIfAbsent(
Utils.splitPartitionLocationUniqueId(commitReplicaId)._1,
(k: Int) => new util.ArrayList[String]())
partitionUniqueIdList.add(commitReplicaId)
})
// record committed partitions storage hint and disk hint
shuffleCommittedInfo.committedPrimaryStorageInfos.putAll(res.committedPrimaryStorageInfos)
shuffleCommittedInfo.committedReplicaStorageInfos.putAll(res.committedReplicaStorageInfos)
// record failed partitions
shuffleCommittedInfo.failedPrimaryPartitionIds.putAll(
res.failedPrimaryIds.asScala.map((_, worker)).toMap.asJava)
shuffleCommittedInfo.failedReplicaPartitionIds.putAll(
res.failedReplicaIds.asScala.map((_, worker)).toMap.asJava)
shuffleCommittedInfo.committedMapIdBitmap.putAll(res.committedMapIdBitMap)
totalWritten.add(res.totalWritten)
fileCount.add(res.fileCount)
shuffleCommittedInfo.currentShuffleFileCount.add(res.fileCount)
}
}