def fromTransportMessage()

in common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala [1044:1460]


  def fromTransportMessage(message: TransportMessage): Any = {
    // This can be removed when Transport Message removes type field support later.
    val messageTypeValue = message.getMessageTypeValue match {
      case UNKNOWN_MESSAGE_VALUE => message.getType.getNumber
      case _ => message.getMessageTypeValue
    }

    messageTypeValue match {
      case UNKNOWN_MESSAGE_VALUE =>
        val msg = s"received unknown message $message"
        logError(msg)
        throw new UnsupportedOperationException(msg)

      // keep it for compatible reason
      case RELEASE_SLOTS_VALUE =>
        val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
        val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
          new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
        ReleaseSlots(
          pbReleaseSlots.getApplicationId,
          pbReleaseSlots.getShuffleId,
          new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
          new util.ArrayList[util.Map[String, Integer]](slotsList),
          pbReleaseSlots.getRequestId)

      case RELEASE_SLOTS_RESPONSE_VALUE =>
        val pbReleaseSlotsResponse = PbReleaseSlotsResponse.parseFrom(message.getPayload)
        ReleaseSlotsResponse(StatusCode.fromValue(pbReleaseSlotsResponse.getStatus))

      case REGISTER_WORKER_VALUE =>
        PbRegisterWorker.parseFrom(message.getPayload)

      case HEARTBEAT_FROM_WORKER_VALUE =>
        val pbHeartbeatFromWorker = PbHeartbeatFromWorker.parseFrom(message.getPayload)
        val userResourceConsumption = PbSerDeUtils.fromPbUserResourceConsumption(
          pbHeartbeatFromWorker.getUserResourceConsumptionMap)
        val pbDisks =
          pbHeartbeatFromWorker.getDisksList.asScala.toSeq.map(PbSerDeUtils.fromPbDiskInfo)
        val activeShuffleKeys = new util.HashSet[String]()
        if (!pbHeartbeatFromWorker.getActiveShuffleKeysList.isEmpty) {
          activeShuffleKeys.addAll(pbHeartbeatFromWorker.getActiveShuffleKeysList)
        }

        val workerStatus = PbSerDeUtils.fromPbWorkerStatus(pbHeartbeatFromWorker.getWorkerStatus)

        HeartbeatFromWorker(
          pbHeartbeatFromWorker.getHost,
          pbHeartbeatFromWorker.getRpcPort,
          pbHeartbeatFromWorker.getPushPort,
          pbHeartbeatFromWorker.getFetchPort,
          pbHeartbeatFromWorker.getReplicatePort,
          pbDisks,
          userResourceConsumption,
          activeShuffleKeys,
          pbHeartbeatFromWorker.getHighWorkload,
          workerStatus,
          pbHeartbeatFromWorker.getRequestId)

      case HEARTBEAT_FROM_WORKER_RESPONSE_VALUE =>
        val pbHeartbeatFromWorkerResponse =
          PbHeartbeatFromWorkerResponse.parseFrom(message.getPayload)
        val expiredShuffleKeys = new util.HashSet[String]()
        if (pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysCount > 0) {
          expiredShuffleKeys.addAll(pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysList)
        }

        HeartbeatFromWorkerResponse(
          expiredShuffleKeys,
          pbHeartbeatFromWorkerResponse.getRegistered,
          pbHeartbeatFromWorkerResponse.getWorkerEventType)

      case REGISTER_SHUFFLE_VALUE =>
        PbRegisterShuffle.parseFrom(message.getPayload)

      case REGISTER_MAP_PARTITION_TASK_VALUE =>
        PbRegisterMapPartitionTask.parseFrom(message.getPayload)

      case REGISTER_SHUFFLE_RESPONSE_VALUE =>
        PbRegisterShuffleResponse.parseFrom(message.getPayload)

      case REQUEST_SLOTS_VALUE =>
        val pbRequestSlots = PbRequestSlots.parseFrom(message.getPayload)
        val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbRequestSlots.getUserIdentifier)
        val excludedWorkerInfoSet =
          pbRequestSlots.getExcludedWorkerSetList.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toSet
        RequestSlots(
          pbRequestSlots.getApplicationId,
          pbRequestSlots.getShuffleId,
          new util.ArrayList[Integer](pbRequestSlots.getPartitionIdListList),
          pbRequestSlots.getHostname,
          pbRequestSlots.getShouldReplicate,
          pbRequestSlots.getShouldRackAware,
          userIdentifier,
          pbRequestSlots.getMaxWorkers,
          pbRequestSlots.getAvailableStorageTypes,
          excludedWorkerInfoSet,
          pbRequestSlots.getPacked,
          pbRequestSlots.getTagsExpr,
          pbRequestSlots.getRequestId)

      case REQUEST_SLOTS_RESPONSE_VALUE =>
        val pbRequestSlotsResponse = PbRequestSlotsResponse.parseFrom(message.getPayload)
        val workerResource =
          if (pbRequestSlotsResponse.getWorkerResourceMap.isEmpty) {
            PbSerDeUtils.fromPbPackedWorkerResource(
              pbRequestSlotsResponse.getPackedWorkerResourceMap)
          } else {
            PbSerDeUtils.fromPbWorkerResource(
              pbRequestSlotsResponse.getWorkerResourceMap)
          }
        RequestSlotsResponse(
          StatusCode.fromValue(pbRequestSlotsResponse.getStatus),
          workerResource)

      case CHANGE_LOCATION_VALUE =>
        PbRevive.parseFrom(message.getPayload)

      case CHANGE_LOCATION_RESPONSE_VALUE =>
        PbChangeLocationResponse.parseFrom(message.getPayload)

      case MAPPER_END_VALUE =>
        val pbMapperEnd = PbMapperEnd.parseFrom(message.getPayload)
        MapperEnd(
          pbMapperEnd.getShuffleId,
          pbMapperEnd.getMapId,
          pbMapperEnd.getAttemptId,
          pbMapperEnd.getNumMappers,
          pbMapperEnd.getPartitionId,
          pbMapperEnd.getPushFailureBatchesMap.asScala.map {
            case (partitionId, pushFailedBatchSet) =>
              (partitionId, PbSerDeUtils.fromPbPushFailedBatchSet(pushFailedBatchSet))
          }.toMap.asJava)

      case MAPPER_END_RESPONSE_VALUE =>
        val pbMapperEndResponse = PbMapperEndResponse.parseFrom(message.getPayload)
        MapperEndResponse(StatusCode.fromValue(pbMapperEndResponse.getStatus))

      case GET_REDUCER_FILE_GROUP_VALUE =>
        val pbGetReducerFileGroup = PbGetReducerFileGroup.parseFrom(message.getPayload)
        GetReducerFileGroup(
          pbGetReducerFileGroup.getShuffleId,
          pbGetReducerFileGroup.getIsSegmentGranularityVisible,
          message.getSerdeVersion)

      case GET_REDUCER_FILE_GROUP_RESPONSE_VALUE =>
        val pbGetReducerFileGroupResponse = PbGetReducerFileGroupResponse
          .parseFrom(message.getPayload)
        val fileGroup = pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
          case (partitionId, fileGroup) =>
            val locationsSet: java.util.Set[PartitionLocation] =
              new util.LinkedHashSet[PartitionLocation]()

            // In PbGetReducerFileGroupResponse, location with same
            // uniqueId will not be put into the location set
            // check out the logic @org.apache.celeborn.client.commit.CommitHandler.parallelCommitFiles
            // This is why we should join the primary location list and replica location list

            val (pris, reps) = PbSerDeUtils.fromPbPackedPartitionLocationsPair(
              fileGroup.getPartitionLocationsPair)
            locationsSet.addAll(pris)
            locationsSet.addAll(reps)
            (
              partitionId,
              locationsSet)
        }.asJava

        val attempts = pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray
        val partitionIds = new util.HashSet(pbGetReducerFileGroupResponse.getPartitionIdsList)
        val pushFailedBatches = pbGetReducerFileGroupResponse.getPushFailedBatchesMap.asScala.map {
          case (uniqueId, pushFailedBatchSet) =>
            (uniqueId, PbSerDeUtils.fromPbPushFailedBatchSet(pushFailedBatchSet))
        }.toMap.asJava
        val broadcast = pbGetReducerFileGroupResponse.getBroadcast.toByteArray
        GetReducerFileGroupResponse(
          StatusCode.fromValue(pbGetReducerFileGroupResponse.getStatus),
          fileGroup,
          attempts,
          partitionIds,
          pushFailedBatches,
          broadcast)

      case GET_SHUFFLE_ID_VALUE =>
        message.getParsedPayload()

      case GET_SHUFFLE_ID_RESPONSE_VALUE =>
        message.getParsedPayload()

      case REPORT_SHUFFLE_FETCH_FAILURE_VALUE =>
        message.getParsedPayload()

      case REPORT_SHUFFLE_FETCH_FAILURE_RESPONSE_VALUE =>
        message.getParsedPayload()

      case UNREGISTER_SHUFFLE_VALUE =>
        PbUnregisterShuffle.parseFrom(message.getPayload)

      case BATCH_UNREGISTER_SHUFFLES_VALUE =>
        PbBatchUnregisterShuffles.parseFrom(message.getPayload)

      case UNREGISTER_SHUFFLE_RESPONSE_VALUE =>
        PbUnregisterShuffleResponse.parseFrom(message.getPayload)

      case BATCH_UNREGISTER_SHUFFLE_RESPONSE_VALUE =>
        PbBatchUnregisterShuffleResponse.parseFrom(message.getPayload)

      case APPLICATION_LOST_VALUE =>
        val pbApplicationLost = PbApplicationLost.parseFrom(message.getPayload)
        ApplicationLost(pbApplicationLost.getAppId, pbApplicationLost.getRequestId)

      case APPLICATION_LOST_RESPONSE_VALUE =>
        val pbApplicationLostResponse = PbApplicationLostResponse.parseFrom(message.getPayload)
        ApplicationLostResponse(StatusCode.fromValue(pbApplicationLostResponse.getStatus))

      case HEARTBEAT_FROM_APPLICATION_VALUE =>
        val pbHeartbeatFromApplication = PbHeartbeatFromApplication.parseFrom(message.getPayload)
        HeartbeatFromApplication(
          pbHeartbeatFromApplication.getAppId,
          pbHeartbeatFromApplication.getTotalWritten,
          pbHeartbeatFromApplication.getFileCount,
          pbHeartbeatFromApplication.getShuffleCount,
          pbHeartbeatFromApplication.getShuffleFallbackCountsMap,
          new util.ArrayList[WorkerInfo](
            pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
              .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
          pbHeartbeatFromApplication.getRequestId,
          pbHeartbeatFromApplication.getShouldResponse)

      case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE =>
        val pbHeartbeatFromApplicationResponse =
          PbHeartbeatFromApplicationResponse.parseFrom(message.getPayload)
        val pbCheckQuotaResponse = pbHeartbeatFromApplicationResponse.getCheckQuotaResponse
        HeartbeatFromApplicationResponse(
          StatusCode.fromValue(pbHeartbeatFromApplicationResponse.getStatus),
          pbHeartbeatFromApplicationResponse.getExcludedWorkersList.asScala
            .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
          pbHeartbeatFromApplicationResponse.getUnknownWorkersList.asScala
            .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
          pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala
            .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
          pbHeartbeatFromApplicationResponse.getRegisteredShufflesList,
          CheckQuotaResponse(pbCheckQuotaResponse.getAvailable, pbCheckQuotaResponse.getReason))

      case CHECK_QUOTA_VALUE =>
        val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload)
        CheckQuota(PbSerDeUtils.fromPbUserIdentifier(pbCheckAvailable.getUserIdentifier))

      case CHECK_QUOTA_RESPONSE_VALUE =>
        val pbCheckAvailableResponse = PbCheckQuotaResponse
          .parseFrom(message.getPayload)
        CheckQuotaResponse(
          pbCheckAvailableResponse.getAvailable,
          pbCheckAvailableResponse.getReason)

      case REPORT_WORKER_FAILURE_VALUE =>
        val pbReportWorkerUnavailable = PbReportWorkerUnavailable.parseFrom(message.getPayload)
        ReportWorkerUnavailable(
          new util.ArrayList[WorkerInfo](pbReportWorkerUnavailable.getUnavailableList
            .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
          pbReportWorkerUnavailable.getRequestId)

      case REPORT_WORKER_DECOMMISSION_VALUE =>
        val pbReportWorkerDecommission = PbReportWorkerDecommission.parseFrom(message.getPayload)
        ReportWorkerDecommission(
          new util.ArrayList[WorkerInfo](pbReportWorkerDecommission.getWorkersList
            .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
          pbReportWorkerDecommission.getRequestId)

      case REMOVE_WORKERS_UNAVAILABLE_INFO_VALUE =>
        PbRemoveWorkersUnavailableInfo.parseFrom(message.getPayload)

      case REGISTER_WORKER_RESPONSE_VALUE =>
        PbRegisterWorkerResponse.parseFrom(message.getPayload)

      case WORKER_EVENT_REQUEST_VALUE =>
        PbWorkerEventRequest.parseFrom(message.getPayload)

      case WORKER_EVENT_RESPONSE_VALUE =>
        PbWorkerEventResponse.parseFrom(message.getPayload)

      case RESERVE_SLOTS_VALUE =>
        val pbReserveSlots = PbReserveSlots.parseFrom(message.getPayload)
        val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbReserveSlots.getUserIdentifier)
        val (primaryLocations, replicateLocations) =
          if (pbReserveSlots.getPrimaryLocationsList.isEmpty && pbReserveSlots.getReplicaLocationsList.isEmpty) {
            PbSerDeUtils.fromPbPackedPartitionLocationsPair(
              pbReserveSlots.getPartitionLocationsPair)
          } else {
            (
              new util.ArrayList[PartitionLocation](pbReserveSlots.getPrimaryLocationsList.asScala
                .map(PbSerDeUtils.fromPbPartitionLocation).toList.asJava),
              new util.ArrayList[PartitionLocation](pbReserveSlots.getReplicaLocationsList.asScala
                .map(PbSerDeUtils.fromPbPartitionLocation).toList.asJava))
          }
        ReserveSlots(
          pbReserveSlots.getApplicationId,
          pbReserveSlots.getShuffleId,
          primaryLocations,
          replicateLocations,
          pbReserveSlots.getSplitThreshold,
          Utils.toShuffleSplitMode(pbReserveSlots.getSplitMode),
          Utils.toPartitionType(pbReserveSlots.getPartitionType),
          pbReserveSlots.getRangeReadFilter,
          userIdentifier,
          pbReserveSlots.getPushDataTimeout,
          pbReserveSlots.getPartitionSplitEnabled,
          pbReserveSlots.getIsSegmentGranularityVisible)

      case RESERVE_SLOTS_RESPONSE_VALUE =>
        val pbReserveSlotsResponse = PbReserveSlotsResponse.parseFrom(message.getPayload)
        ReserveSlotsResponse(
          StatusCode.fromValue(pbReserveSlotsResponse.getStatus),
          pbReserveSlotsResponse.getReason)

      case COMMIT_FILES_VALUE =>
        val pbCommitFiles = PbCommitFiles.parseFrom(message.getPayload)
        CommitFiles(
          pbCommitFiles.getApplicationId,
          pbCommitFiles.getShuffleId,
          pbCommitFiles.getPrimaryIdsList,
          pbCommitFiles.getReplicaIdsList,
          pbCommitFiles.getMapAttemptsList.asScala.map(_.toInt).toArray,
          pbCommitFiles.getEpoch,
          pbCommitFiles.getMockFailure)

      case COMMIT_FILES_RESPONSE_VALUE =>
        val pbCommitFilesResponse = PbCommitFilesResponse.parseFrom(message.getPayload)
        val committedPrimaryStorageInfos = new util.HashMap[String, StorageInfo]()
        val committedReplicaStorageInfos = new util.HashMap[String, StorageInfo]()
        val committedBitMap = new util.HashMap[String, RoaringBitmap]()
        pbCommitFilesResponse.getCommittedPrimaryStorageInfosMap.asScala.foreach(entry =>
          committedPrimaryStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
        pbCommitFilesResponse.getCommittedReplicaStorageInfosMap.asScala.foreach(entry =>
          committedReplicaStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
        pbCommitFilesResponse.getMapIdBitmapMap.asScala.foreach { entry =>
          committedBitMap.put(entry._1, Utils.byteStringToRoaringBitmap(entry._2))
        }
        CommitFilesResponse(
          StatusCode.fromValue(pbCommitFilesResponse.getStatus),
          pbCommitFilesResponse.getCommittedPrimaryIdsList,
          pbCommitFilesResponse.getCommittedReplicaIdsList,
          pbCommitFilesResponse.getFailedPrimaryIdsList,
          pbCommitFilesResponse.getFailedReplicaIdsList,
          committedPrimaryStorageInfos,
          committedReplicaStorageInfos,
          committedBitMap,
          pbCommitFilesResponse.getTotalWritten,
          pbCommitFilesResponse.getFileCount)

      case DESTROY_VALUE =>
        val pbDestroy = PbDestroyWorkerSlots.parseFrom(message.getPayload)
        DestroyWorkerSlots(
          pbDestroy.getShuffleKey,
          pbDestroy.getPrimaryLocationsList,
          pbDestroy.getReplicaLocationList,
          pbDestroy.getMockFailure)

      case DESTROY_RESPONSE_VALUE =>
        val pbDestroyResponse = PbDestroyWorkerSlotsResponse.parseFrom(message.getPayload)
        DestroyWorkerSlotsResponse(
          StatusCode.fromValue(pbDestroyResponse.getStatus),
          pbDestroyResponse.getFailedPrimariesList,
          pbDestroyResponse.getFailedReplicasList)

      case REMOVE_EXPIRED_SHUFFLE_VALUE =>
        RemoveExpiredShuffle

      case ONE_WAY_MESSAGE_RESPONSE_VALUE =>
        OneWayMessageResponse

      case CHECK_WORKER_TIMEOUT_VALUE =>
        pbCheckForWorkerTimeout

      case CHECK_APPLICATION_TIMEOUT_VALUE =>
        CheckForApplicationTimeOut

      case CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
        CheckForDFSExpiredDirsTimeout

      case WORKER_LOST_VALUE =>
        PbWorkerLost.parseFrom(message.getPayload)

      case WORKER_LOST_RESPONSE_VALUE =>
        PbWorkerLostResponse.parseFrom(message.getPayload)

      case STAGE_END_VALUE =>
        val pbStageEnd = PbStageEnd.parseFrom(message.getPayload)
        StageEnd(pbStageEnd.getShuffleId)

      case PARTITION_SPLIT_VALUE =>
        PbPartitionSplit.parseFrom(message.getPayload)

      case STAGE_END_RESPONSE_VALUE =>
        val pbStageEndResponse = PbStageEndResponse.parseFrom(message.getPayload)
        StageEndResponse(StatusCode.fromValue(pbStageEndResponse.getStatus))

      case CHECK_WORKERS_AVAILABLE_VALUE =>
        PbCheckWorkersAvailable.parseFrom(message.getPayload)

      case CHECK_WORKERS_AVAILABLE_RESPONSE_VALUE =>
        PbCheckWorkersAvailableResponse.parseFrom(message.getPayload)

      case APPLICATION_META_VALUE =>
        PbApplicationMeta.parseFrom(message.getPayload)

      case APPLICATION_META_REQUEST_VALUE =>
        PbApplicationMetaRequest.parseFrom(message.getPayload)

      case REPORT_BARRIER_STAGE_ATTEMPT_FAILURE_VALUE =>
        PbReportBarrierStageAttemptFailure.parseFrom(message.getPayload)

      case REPORT_BARRIER_STAGE_ATTEMPT_FAILURE_RESPONSE_VALUE =>
        PbReportBarrierStageAttemptFailureResponse.parseFrom(message.getPayload)

      case PUSH_MERGED_DATA_SPLIT_PARTITION_INFO_VALUE =>
        PbPushMergedDataSplitPartitionInfo.parseFrom(message.getPayload)
    }
  }