def commitFiles()

in client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala [247:364]


  def commitFiles(
      applicationId: String,
      shuffleId: Int,
      shuffleCommittedInfo: ShuffleCommittedInfo,
      worker: WorkerInfo,
      primaryIds: util.List[String],
      replicaIds: util.List[String],
      commitFilesFailedWorkers: ShuffleFailedWorkers): Unit = {

    if (CollectionUtils.isEmpty(primaryIds) && CollectionUtils.isEmpty(replicaIds)) {
      return
    }

    val res =
      if (!testRetryCommitFiles) {
        val commitFiles = CommitFiles(
          applicationId,
          shuffleId,
          primaryIds,
          replicaIds,
          getMapperAttempts(shuffleId),
          commitEpoch.incrementAndGet())
        val res =
          if (conf.clientCommitFilesIgnoreExcludedWorkers &&
            workerStatusTracker.excludedWorkers.containsKey(worker)) {
            CommitFilesResponse(
              StatusCode.WORKER_EXCLUDED,
              List.empty.asJava,
              List.empty.asJava,
              primaryIds,
              replicaIds)
          } else {
            requestCommitFilesWithRetry(worker.endpoint, commitFiles)
          }

        res.status match {
          case StatusCode.SUCCESS => // do nothing
          case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED | StatusCode.WORKER_EXCLUDED =>
            logInfo(s"Request $commitFiles return ${res.status} for " +
              s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
            if (res.status != StatusCode.WORKER_EXCLUDED) {
              commitFilesFailedWorkers.put(worker, (res.status, System.currentTimeMillis()))
            }
          case _ =>
            logError(s"Should never reach here! commit files response status ${res.status}")
        }
        res
      } else {
        // for test
        val commitFiles1 = CommitFiles(
          applicationId,
          shuffleId,
          primaryIds.subList(0, primaryIds.size() / 2),
          replicaIds.subList(0, replicaIds.size() / 2),
          getMapperAttempts(shuffleId),
          commitEpoch.incrementAndGet())
        val res1 = requestCommitFilesWithRetry(worker.endpoint, commitFiles1)

        val commitFiles = CommitFiles(
          applicationId,
          shuffleId,
          primaryIds.subList(primaryIds.size() / 2, primaryIds.size()),
          replicaIds.subList(replicaIds.size() / 2, replicaIds.size()),
          getMapperAttempts(shuffleId),
          commitEpoch.incrementAndGet())
        val res2 = requestCommitFilesWithRetry(worker.endpoint, commitFiles)

        res1.committedPrimaryStorageInfos.putAll(res2.committedPrimaryStorageInfos)
        res1.committedReplicaStorageInfos.putAll(res2.committedReplicaStorageInfos)
        res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
        CommitFilesResponse(
          status = if (res1.status == StatusCode.SUCCESS) res2.status else res1.status,
          (res1.committedPrimaryIds.asScala ++ res2.committedPrimaryIds.asScala).toList.asJava,
          (res1.committedReplicaIds.asScala ++ res1.committedReplicaIds.asScala).toList.asJava,
          (res1.failedPrimaryIds.asScala ++ res1.failedPrimaryIds.asScala).toList.asJava,
          (res1.failedReplicaIds.asScala ++ res2.failedReplicaIds.asScala).toList.asJava,
          res1.committedPrimaryStorageInfos,
          res1.committedReplicaStorageInfos,
          res1.committedMapIdBitMap,
          res1.totalWritten + res2.totalWritten,
          res1.fileCount + res2.fileCount)
      }

    shuffleCommittedInfo.synchronized {
      // record committed partitionIds
      res.committedPrimaryIds.asScala.foreach({
        case commitPrimaryId =>
          val partitionUniqueIdList = shuffleCommittedInfo.committedPrimaryIds.computeIfAbsent(
            Utils.splitPartitionLocationUniqueId(commitPrimaryId)._1,
            (k: Int) => new util.ArrayList[String]())
          partitionUniqueIdList.add(commitPrimaryId)
      })

      res.committedReplicaIds.asScala.foreach({
        case commitReplicaId =>
          val partitionUniqueIdList = shuffleCommittedInfo.committedReplicaIds.computeIfAbsent(
            Utils.splitPartitionLocationUniqueId(commitReplicaId)._1,
            (k: Int) => new util.ArrayList[String]())
          partitionUniqueIdList.add(commitReplicaId)
      })

      // record committed partitions storage hint and disk hint
      shuffleCommittedInfo.committedPrimaryStorageInfos.putAll(res.committedPrimaryStorageInfos)
      shuffleCommittedInfo.committedReplicaStorageInfos.putAll(res.committedReplicaStorageInfos)

      // record failed partitions
      shuffleCommittedInfo.failedPrimaryPartitionIds.putAll(
        res.failedPrimaryIds.asScala.map((_, worker)).toMap.asJava)
      shuffleCommittedInfo.failedReplicaPartitionIds.putAll(
        res.failedReplicaIds.asScala.map((_, worker)).toMap.asJava)

      shuffleCommittedInfo.committedMapIdBitmap.putAll(res.committedMapIdBitMap)

      totalWritten.add(res.totalWritten)
      fileCount.add(res.fileCount)
      shuffleCommittedInfo.currentShuffleFileCount.add(res.fileCount)
    }
  }