def fromTransportMessage()

in common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala [736:1021]


  def fromTransportMessage(message: TransportMessage): Any = {
    // This can be removed when Transport Message removes type field support later.
    val messageTypeValue = message.getMessageTypeValue match {
      case UNKNOWN_MESSAGE_VALUE => message.getType.getNumber
      case _ => message.getMessageTypeValue
    }

    messageTypeValue match {
      case UNKNOWN_MESSAGE_VALUE =>
        val msg = s"received unknown message $message"
        logError(msg)
        throw new UnsupportedOperationException(msg)

      case REGISTER_WORKER_VALUE =>
        PbRegisterWorker.parseFrom(message.getPayload)

      case HEARTBEAT_FROM_WORKER_VALUE =>
        val pbHeartbeatFromWorker = PbHeartbeatFromWorker.parseFrom(message.getPayload)
        val estimatedAppDiskUsage = new util.HashMap[String, java.lang.Long]()
        val userResourceConsumption = PbSerDeUtils.fromPbUserResourceConsumption(
          pbHeartbeatFromWorker.getUserResourceConsumptionMap)
        val pbDisks = pbHeartbeatFromWorker.getDisksList.asScala.map(PbSerDeUtils.fromPbDiskInfo)
        if (!pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap.isEmpty) {
          estimatedAppDiskUsage.putAll(pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap)
        }
        val activeShuffleKeys = new util.HashSet[String]()
        if (!pbHeartbeatFromWorker.getActiveShuffleKeysList.isEmpty) {
          activeShuffleKeys.addAll(pbHeartbeatFromWorker.getActiveShuffleKeysList)
        }
        HeartbeatFromWorker(
          pbHeartbeatFromWorker.getHost,
          pbHeartbeatFromWorker.getRpcPort,
          pbHeartbeatFromWorker.getPushPort,
          pbHeartbeatFromWorker.getFetchPort,
          pbHeartbeatFromWorker.getReplicatePort,
          pbDisks,
          userResourceConsumption,
          activeShuffleKeys,
          estimatedAppDiskUsage,
          pbHeartbeatFromWorker.getRequestId)

      case HEARTBEAT_FROM_WORKER_RESPONSE_VALUE =>
        val pbHeartbeatFromWorkerResponse =
          PbHeartbeatFromWorkerResponse.parseFrom(message.getPayload)
        val expiredShuffleKeys = new util.HashSet[String]()
        if (pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysCount > 0) {
          expiredShuffleKeys.addAll(pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysList)
        }
        HeartbeatFromWorkerResponse(expiredShuffleKeys, pbHeartbeatFromWorkerResponse.getRegistered)

      case REGISTER_SHUFFLE_VALUE =>
        PbRegisterShuffle.parseFrom(message.getPayload)

      case REGISTER_MAP_PARTITION_TASK_VALUE =>
        PbRegisterMapPartitionTask.parseFrom(message.getPayload)

      case REGISTER_SHUFFLE_RESPONSE_VALUE =>
        PbRegisterShuffleResponse.parseFrom(message.getPayload)

      case REQUEST_SLOTS_VALUE =>
        val pbRequestSlots = PbRequestSlots.parseFrom(message.getPayload)
        val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbRequestSlots.getUserIdentifier)
        RequestSlots(
          pbRequestSlots.getApplicationId,
          pbRequestSlots.getShuffleId,
          new util.ArrayList[Integer](pbRequestSlots.getPartitionIdListList),
          pbRequestSlots.getHostname,
          pbRequestSlots.getShouldReplicate,
          pbRequestSlots.getShouldRackAware,
          userIdentifier,
          pbRequestSlots.getMaxWorkers,
          pbRequestSlots.getRequestId)

      case REQUEST_SLOTS_RESPONSE_VALUE =>
        val pbRequestSlotsResponse = PbRequestSlotsResponse.parseFrom(message.getPayload)
        RequestSlotsResponse(
          Utils.toStatusCode(pbRequestSlotsResponse.getStatus),
          PbSerDeUtils.fromPbWorkerResource(
            pbRequestSlotsResponse.getWorkerResourceMap))

      case CHANGE_LOCATION_VALUE =>
        PbRevive.parseFrom(message.getPayload)

      case CHANGE_LOCATION_RESPONSE_VALUE =>
        PbChangeLocationResponse.parseFrom(message.getPayload)

      case MAPPER_END_VALUE =>
        val pbMapperEnd = PbMapperEnd.parseFrom(message.getPayload)
        MapperEnd(
          pbMapperEnd.getShuffleId,
          pbMapperEnd.getMapId,
          pbMapperEnd.getAttemptId,
          pbMapperEnd.getNumMappers,
          pbMapperEnd.getPartitionId)

      case MAPPER_END_RESPONSE_VALUE =>
        val pbMapperEndResponse = PbMapperEndResponse.parseFrom(message.getPayload)
        MapperEndResponse(Utils.toStatusCode(pbMapperEndResponse.getStatus))

      case GET_REDUCER_FILE_GROUP_VALUE =>
        val pbGetReducerFileGroup = PbGetReducerFileGroup.parseFrom(message.getPayload)
        GetReducerFileGroup(
          pbGetReducerFileGroup.getShuffleId)

      case GET_REDUCER_FILE_GROUP_RESPONSE_VALUE =>
        val pbGetReducerFileGroupResponse = PbGetReducerFileGroupResponse
          .parseFrom(message.getPayload)
        val fileGroup = pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
          case (partitionId, fileGroup) =>
            (
              partitionId,
              fileGroup.getLocationsList.asScala.map(
                PbSerDeUtils.fromPbPartitionLocation).toSet.asJava)
        }.asJava

        val attempts = pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray
        val partitionIds = new util.HashSet(pbGetReducerFileGroupResponse.getPartitionIdsList)
        GetReducerFileGroupResponse(
          Utils.toStatusCode(pbGetReducerFileGroupResponse.getStatus),
          fileGroup,
          attempts,
          partitionIds)

      case UNREGISTER_SHUFFLE_VALUE =>
        PbUnregisterShuffle.parseFrom(message.getPayload)

      case UNREGISTER_SHUFFLE_RESPONSE_VALUE =>
        PbUnregisterShuffleResponse.parseFrom(message.getPayload)

      case APPLICATION_LOST_VALUE =>
        val pbApplicationLost = PbApplicationLost.parseFrom(message.getPayload)
        ApplicationLost(pbApplicationLost.getAppId, pbApplicationLost.getRequestId)

      case APPLICATION_LOST_RESPONSE_VALUE =>
        val pbApplicationLostResponse = PbApplicationLostResponse.parseFrom(message.getPayload)
        ApplicationLostResponse(Utils.toStatusCode(pbApplicationLostResponse.getStatus))

      case HEARTBEAT_FROM_APPLICATION_VALUE =>
        val pbHeartbeatFromApplication = PbHeartbeatFromApplication.parseFrom(message.getPayload)
        HeartbeatFromApplication(
          pbHeartbeatFromApplication.getAppId,
          pbHeartbeatFromApplication.getTotalWritten,
          pbHeartbeatFromApplication.getFileCount,
          new util.ArrayList[WorkerInfo](
            pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
              .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
          pbHeartbeatFromApplication.getRequestId,
          pbHeartbeatFromApplication.getShouldResponse)

      case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE =>
        val pbHeartbeatFromApplicationResponse =
          PbHeartbeatFromApplicationResponse.parseFrom(message.getPayload)
        HeartbeatFromApplicationResponse(
          Utils.toStatusCode(pbHeartbeatFromApplicationResponse.getStatus),
          pbHeartbeatFromApplicationResponse.getExcludedWorkersList.asScala
            .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
          pbHeartbeatFromApplicationResponse.getUnknownWorkersList.asScala
            .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
          pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala
            .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)

      case CHECK_QUOTA_VALUE =>
        val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload)
        CheckQuota(PbSerDeUtils.fromPbUserIdentifier(pbCheckAvailable.getUserIdentifier))

      case CHECK_QUOTA_RESPONSE_VALUE =>
        val pbCheckAvailableResponse = PbCheckQuotaResponse
          .parseFrom(message.getPayload)
        CheckQuotaResponse(
          pbCheckAvailableResponse.getAvailable,
          pbCheckAvailableResponse.getReason)

      case REPORT_WORKER_FAILURE_VALUE =>
        val pbReportWorkerUnavailable = PbReportWorkerUnavailable.parseFrom(message.getPayload)
        ReportWorkerUnavailable(
          new util.ArrayList[WorkerInfo](pbReportWorkerUnavailable.getUnavailableList
            .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
          pbReportWorkerUnavailable.getRequestId)

      case REGISTER_WORKER_RESPONSE_VALUE =>
        PbRegisterWorkerResponse.parseFrom(message.getPayload)

      case RESERVE_SLOTS_VALUE =>
        val pbReserveSlots = PbReserveSlots.parseFrom(message.getPayload)
        val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbReserveSlots.getUserIdentifier)
        ReserveSlots(
          pbReserveSlots.getApplicationId,
          pbReserveSlots.getShuffleId,
          new util.ArrayList[PartitionLocation](pbReserveSlots.getPrimaryLocationsList.asScala
            .map(PbSerDeUtils.fromPbPartitionLocation).toList.asJava),
          new util.ArrayList[PartitionLocation](pbReserveSlots.getReplicaLocationsList.asScala
            .map(PbSerDeUtils.fromPbPartitionLocation).toList.asJava),
          pbReserveSlots.getSplitThreshold,
          Utils.toShuffleSplitMode(pbReserveSlots.getSplitMode),
          Utils.toPartitionType(pbReserveSlots.getPartitionType),
          pbReserveSlots.getRangeReadFilter,
          userIdentifier,
          pbReserveSlots.getPushDataTimeout)

      case RESERVE_SLOTS_RESPONSE_VALUE =>
        val pbReserveSlotsResponse = PbReserveSlotsResponse.parseFrom(message.getPayload)
        ReserveSlotsResponse(
          Utils.toStatusCode(pbReserveSlotsResponse.getStatus),
          pbReserveSlotsResponse.getReason)

      case COMMIT_FILES_VALUE =>
        val pbCommitFiles = PbCommitFiles.parseFrom(message.getPayload)
        CommitFiles(
          pbCommitFiles.getApplicationId,
          pbCommitFiles.getShuffleId,
          pbCommitFiles.getPrimaryIdsList,
          pbCommitFiles.getReplicaIdsList,
          pbCommitFiles.getMapAttemptsList.asScala.map(_.toInt).toArray,
          pbCommitFiles.getEpoch)

      case COMMIT_FILES_RESPONSE_VALUE =>
        val pbCommitFilesResponse = PbCommitFilesResponse.parseFrom(message.getPayload)
        val committedPrimaryStorageInfos = new util.HashMap[String, StorageInfo]()
        val committedReplicaStorageInfos = new util.HashMap[String, StorageInfo]()
        val committedBitMap = new util.HashMap[String, RoaringBitmap]()
        pbCommitFilesResponse.getCommittedPrimaryStorageInfosMap.asScala.foreach(entry =>
          committedPrimaryStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
        pbCommitFilesResponse.getCommittedReplicaStorageInfosMap.asScala.foreach(entry =>
          committedReplicaStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
        pbCommitFilesResponse.getMapIdBitmapMap.asScala.foreach { entry =>
          committedBitMap.put(entry._1, Utils.byteStringToRoaringBitmap(entry._2))
        }
        CommitFilesResponse(
          Utils.toStatusCode(pbCommitFilesResponse.getStatus),
          pbCommitFilesResponse.getCommittedPrimaryIdsList,
          pbCommitFilesResponse.getCommittedReplicaIdsList,
          pbCommitFilesResponse.getFailedPrimaryIdsList,
          pbCommitFilesResponse.getFailedReplicaIdsList,
          committedPrimaryStorageInfos,
          committedReplicaStorageInfos,
          committedBitMap,
          pbCommitFilesResponse.getTotalWritten,
          pbCommitFilesResponse.getFileCount)

      case DESTROY_VALUE =>
        val pbDestroy = PbDestroyWorkerSlots.parseFrom(message.getPayload)
        DestroyWorkerSlots(
          pbDestroy.getShuffleKey,
          pbDestroy.getPrimaryLocationsList,
          pbDestroy.getReplicaLocationList)

      case DESTROY_RESPONSE_VALUE =>
        val pbDestroyResponse = PbDestroyWorkerSlotsResponse.parseFrom(message.getPayload)
        DestroyWorkerSlotsResponse(
          Utils.toStatusCode(pbDestroyResponse.getStatus),
          pbDestroyResponse.getFailedPrimariesList,
          pbDestroyResponse.getFailedReplicasList)

      case REMOVE_EXPIRED_SHUFFLE_VALUE =>
        RemoveExpiredShuffle

      case ONE_WAY_MESSAGE_RESPONSE_VALUE =>
        OneWayMessageResponse

      case CHECK_WORKER_TIMEOUT_VALUE =>
        pbCheckForWorkerTimeout

      case CHECK_APPLICATION_TIMEOUT_VALUE =>
        CheckForApplicationTimeOut

      case CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
        CheckForHDFSExpiredDirsTimeout

      case WORKER_LOST_VALUE =>
        PbWorkerLost.parseFrom(message.getPayload)

      case WORKER_LOST_RESPONSE_VALUE =>
        PbWorkerLostResponse.parseFrom(message.getPayload)

      case STAGE_END_VALUE =>
        val pbStageEnd = PbStageEnd.parseFrom(message.getPayload)
        StageEnd(pbStageEnd.getShuffleId)

      case PARTITION_SPLIT_VALUE =>
        PbPartitionSplit.parseFrom(message.getPayload)

      case STAGE_END_RESPONSE_VALUE =>
        val pbStageEndResponse = PbStageEndResponse.parseFrom(message.getPayload)
        StageEndResponse(Utils.toStatusCode(pbStageEndResponse.getStatus))
    }
  }