in common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala [736:1021]
def fromTransportMessage(message: TransportMessage): Any = {
// This can be removed when Transport Message removes type field support later.
val messageTypeValue = message.getMessageTypeValue match {
case UNKNOWN_MESSAGE_VALUE => message.getType.getNumber
case _ => message.getMessageTypeValue
}
messageTypeValue match {
case UNKNOWN_MESSAGE_VALUE =>
val msg = s"received unknown message $message"
logError(msg)
throw new UnsupportedOperationException(msg)
case REGISTER_WORKER_VALUE =>
PbRegisterWorker.parseFrom(message.getPayload)
case HEARTBEAT_FROM_WORKER_VALUE =>
val pbHeartbeatFromWorker = PbHeartbeatFromWorker.parseFrom(message.getPayload)
val estimatedAppDiskUsage = new util.HashMap[String, java.lang.Long]()
val userResourceConsumption = PbSerDeUtils.fromPbUserResourceConsumption(
pbHeartbeatFromWorker.getUserResourceConsumptionMap)
val pbDisks = pbHeartbeatFromWorker.getDisksList.asScala.map(PbSerDeUtils.fromPbDiskInfo)
if (!pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap.isEmpty) {
estimatedAppDiskUsage.putAll(pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap)
}
val activeShuffleKeys = new util.HashSet[String]()
if (!pbHeartbeatFromWorker.getActiveShuffleKeysList.isEmpty) {
activeShuffleKeys.addAll(pbHeartbeatFromWorker.getActiveShuffleKeysList)
}
HeartbeatFromWorker(
pbHeartbeatFromWorker.getHost,
pbHeartbeatFromWorker.getRpcPort,
pbHeartbeatFromWorker.getPushPort,
pbHeartbeatFromWorker.getFetchPort,
pbHeartbeatFromWorker.getReplicatePort,
pbDisks,
userResourceConsumption,
activeShuffleKeys,
estimatedAppDiskUsage,
pbHeartbeatFromWorker.getRequestId)
case HEARTBEAT_FROM_WORKER_RESPONSE_VALUE =>
val pbHeartbeatFromWorkerResponse =
PbHeartbeatFromWorkerResponse.parseFrom(message.getPayload)
val expiredShuffleKeys = new util.HashSet[String]()
if (pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysCount > 0) {
expiredShuffleKeys.addAll(pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysList)
}
HeartbeatFromWorkerResponse(expiredShuffleKeys, pbHeartbeatFromWorkerResponse.getRegistered)
case REGISTER_SHUFFLE_VALUE =>
PbRegisterShuffle.parseFrom(message.getPayload)
case REGISTER_MAP_PARTITION_TASK_VALUE =>
PbRegisterMapPartitionTask.parseFrom(message.getPayload)
case REGISTER_SHUFFLE_RESPONSE_VALUE =>
PbRegisterShuffleResponse.parseFrom(message.getPayload)
case REQUEST_SLOTS_VALUE =>
val pbRequestSlots = PbRequestSlots.parseFrom(message.getPayload)
val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbRequestSlots.getUserIdentifier)
RequestSlots(
pbRequestSlots.getApplicationId,
pbRequestSlots.getShuffleId,
new util.ArrayList[Integer](pbRequestSlots.getPartitionIdListList),
pbRequestSlots.getHostname,
pbRequestSlots.getShouldReplicate,
pbRequestSlots.getShouldRackAware,
userIdentifier,
pbRequestSlots.getMaxWorkers,
pbRequestSlots.getRequestId)
case REQUEST_SLOTS_RESPONSE_VALUE =>
val pbRequestSlotsResponse = PbRequestSlotsResponse.parseFrom(message.getPayload)
RequestSlotsResponse(
Utils.toStatusCode(pbRequestSlotsResponse.getStatus),
PbSerDeUtils.fromPbWorkerResource(
pbRequestSlotsResponse.getWorkerResourceMap))
case CHANGE_LOCATION_VALUE =>
PbRevive.parseFrom(message.getPayload)
case CHANGE_LOCATION_RESPONSE_VALUE =>
PbChangeLocationResponse.parseFrom(message.getPayload)
case MAPPER_END_VALUE =>
val pbMapperEnd = PbMapperEnd.parseFrom(message.getPayload)
MapperEnd(
pbMapperEnd.getShuffleId,
pbMapperEnd.getMapId,
pbMapperEnd.getAttemptId,
pbMapperEnd.getNumMappers,
pbMapperEnd.getPartitionId)
case MAPPER_END_RESPONSE_VALUE =>
val pbMapperEndResponse = PbMapperEndResponse.parseFrom(message.getPayload)
MapperEndResponse(Utils.toStatusCode(pbMapperEndResponse.getStatus))
case GET_REDUCER_FILE_GROUP_VALUE =>
val pbGetReducerFileGroup = PbGetReducerFileGroup.parseFrom(message.getPayload)
GetReducerFileGroup(
pbGetReducerFileGroup.getShuffleId)
case GET_REDUCER_FILE_GROUP_RESPONSE_VALUE =>
val pbGetReducerFileGroupResponse = PbGetReducerFileGroupResponse
.parseFrom(message.getPayload)
val fileGroup = pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
case (partitionId, fileGroup) =>
(
partitionId,
fileGroup.getLocationsList.asScala.map(
PbSerDeUtils.fromPbPartitionLocation).toSet.asJava)
}.asJava
val attempts = pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray
val partitionIds = new util.HashSet(pbGetReducerFileGroupResponse.getPartitionIdsList)
GetReducerFileGroupResponse(
Utils.toStatusCode(pbGetReducerFileGroupResponse.getStatus),
fileGroup,
attempts,
partitionIds)
case UNREGISTER_SHUFFLE_VALUE =>
PbUnregisterShuffle.parseFrom(message.getPayload)
case UNREGISTER_SHUFFLE_RESPONSE_VALUE =>
PbUnregisterShuffleResponse.parseFrom(message.getPayload)
case APPLICATION_LOST_VALUE =>
val pbApplicationLost = PbApplicationLost.parseFrom(message.getPayload)
ApplicationLost(pbApplicationLost.getAppId, pbApplicationLost.getRequestId)
case APPLICATION_LOST_RESPONSE_VALUE =>
val pbApplicationLostResponse = PbApplicationLostResponse.parseFrom(message.getPayload)
ApplicationLostResponse(Utils.toStatusCode(pbApplicationLostResponse.getStatus))
case HEARTBEAT_FROM_APPLICATION_VALUE =>
val pbHeartbeatFromApplication = PbHeartbeatFromApplication.parseFrom(message.getPayload)
HeartbeatFromApplication(
pbHeartbeatFromApplication.getAppId,
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
new util.ArrayList[WorkerInfo](
pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbHeartbeatFromApplication.getRequestId,
pbHeartbeatFromApplication.getShouldResponse)
case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE =>
val pbHeartbeatFromApplicationResponse =
PbHeartbeatFromApplicationResponse.parseFrom(message.getPayload)
HeartbeatFromApplicationResponse(
Utils.toStatusCode(pbHeartbeatFromApplicationResponse.getStatus),
pbHeartbeatFromApplicationResponse.getExcludedWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getUnknownWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
case CHECK_QUOTA_VALUE =>
val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload)
CheckQuota(PbSerDeUtils.fromPbUserIdentifier(pbCheckAvailable.getUserIdentifier))
case CHECK_QUOTA_RESPONSE_VALUE =>
val pbCheckAvailableResponse = PbCheckQuotaResponse
.parseFrom(message.getPayload)
CheckQuotaResponse(
pbCheckAvailableResponse.getAvailable,
pbCheckAvailableResponse.getReason)
case REPORT_WORKER_FAILURE_VALUE =>
val pbReportWorkerUnavailable = PbReportWorkerUnavailable.parseFrom(message.getPayload)
ReportWorkerUnavailable(
new util.ArrayList[WorkerInfo](pbReportWorkerUnavailable.getUnavailableList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbReportWorkerUnavailable.getRequestId)
case REGISTER_WORKER_RESPONSE_VALUE =>
PbRegisterWorkerResponse.parseFrom(message.getPayload)
case RESERVE_SLOTS_VALUE =>
val pbReserveSlots = PbReserveSlots.parseFrom(message.getPayload)
val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbReserveSlots.getUserIdentifier)
ReserveSlots(
pbReserveSlots.getApplicationId,
pbReserveSlots.getShuffleId,
new util.ArrayList[PartitionLocation](pbReserveSlots.getPrimaryLocationsList.asScala
.map(PbSerDeUtils.fromPbPartitionLocation).toList.asJava),
new util.ArrayList[PartitionLocation](pbReserveSlots.getReplicaLocationsList.asScala
.map(PbSerDeUtils.fromPbPartitionLocation).toList.asJava),
pbReserveSlots.getSplitThreshold,
Utils.toShuffleSplitMode(pbReserveSlots.getSplitMode),
Utils.toPartitionType(pbReserveSlots.getPartitionType),
pbReserveSlots.getRangeReadFilter,
userIdentifier,
pbReserveSlots.getPushDataTimeout)
case RESERVE_SLOTS_RESPONSE_VALUE =>
val pbReserveSlotsResponse = PbReserveSlotsResponse.parseFrom(message.getPayload)
ReserveSlotsResponse(
Utils.toStatusCode(pbReserveSlotsResponse.getStatus),
pbReserveSlotsResponse.getReason)
case COMMIT_FILES_VALUE =>
val pbCommitFiles = PbCommitFiles.parseFrom(message.getPayload)
CommitFiles(
pbCommitFiles.getApplicationId,
pbCommitFiles.getShuffleId,
pbCommitFiles.getPrimaryIdsList,
pbCommitFiles.getReplicaIdsList,
pbCommitFiles.getMapAttemptsList.asScala.map(_.toInt).toArray,
pbCommitFiles.getEpoch)
case COMMIT_FILES_RESPONSE_VALUE =>
val pbCommitFilesResponse = PbCommitFilesResponse.parseFrom(message.getPayload)
val committedPrimaryStorageInfos = new util.HashMap[String, StorageInfo]()
val committedReplicaStorageInfos = new util.HashMap[String, StorageInfo]()
val committedBitMap = new util.HashMap[String, RoaringBitmap]()
pbCommitFilesResponse.getCommittedPrimaryStorageInfosMap.asScala.foreach(entry =>
committedPrimaryStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
pbCommitFilesResponse.getCommittedReplicaStorageInfosMap.asScala.foreach(entry =>
committedReplicaStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
pbCommitFilesResponse.getMapIdBitmapMap.asScala.foreach { entry =>
committedBitMap.put(entry._1, Utils.byteStringToRoaringBitmap(entry._2))
}
CommitFilesResponse(
Utils.toStatusCode(pbCommitFilesResponse.getStatus),
pbCommitFilesResponse.getCommittedPrimaryIdsList,
pbCommitFilesResponse.getCommittedReplicaIdsList,
pbCommitFilesResponse.getFailedPrimaryIdsList,
pbCommitFilesResponse.getFailedReplicaIdsList,
committedPrimaryStorageInfos,
committedReplicaStorageInfos,
committedBitMap,
pbCommitFilesResponse.getTotalWritten,
pbCommitFilesResponse.getFileCount)
case DESTROY_VALUE =>
val pbDestroy = PbDestroyWorkerSlots.parseFrom(message.getPayload)
DestroyWorkerSlots(
pbDestroy.getShuffleKey,
pbDestroy.getPrimaryLocationsList,
pbDestroy.getReplicaLocationList)
case DESTROY_RESPONSE_VALUE =>
val pbDestroyResponse = PbDestroyWorkerSlotsResponse.parseFrom(message.getPayload)
DestroyWorkerSlotsResponse(
Utils.toStatusCode(pbDestroyResponse.getStatus),
pbDestroyResponse.getFailedPrimariesList,
pbDestroyResponse.getFailedReplicasList)
case REMOVE_EXPIRED_SHUFFLE_VALUE =>
RemoveExpiredShuffle
case ONE_WAY_MESSAGE_RESPONSE_VALUE =>
OneWayMessageResponse
case CHECK_WORKER_TIMEOUT_VALUE =>
pbCheckForWorkerTimeout
case CHECK_APPLICATION_TIMEOUT_VALUE =>
CheckForApplicationTimeOut
case CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
CheckForHDFSExpiredDirsTimeout
case WORKER_LOST_VALUE =>
PbWorkerLost.parseFrom(message.getPayload)
case WORKER_LOST_RESPONSE_VALUE =>
PbWorkerLostResponse.parseFrom(message.getPayload)
case STAGE_END_VALUE =>
val pbStageEnd = PbStageEnd.parseFrom(message.getPayload)
StageEnd(pbStageEnd.getShuffleId)
case PARTITION_SPLIT_VALUE =>
PbPartitionSplit.parseFrom(message.getPayload)
case STAGE_END_RESPONSE_VALUE =>
val pbStageEndResponse = PbStageEndResponse.parseFrom(message.getPayload)
StageEndResponse(Utils.toStatusCode(pbStageEndResponse.getStatus))
}
}