in common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala [1044:1460]
def fromTransportMessage(message: TransportMessage): Any = {
// This can be removed when Transport Message removes type field support later.
val messageTypeValue = message.getMessageTypeValue match {
case UNKNOWN_MESSAGE_VALUE => message.getType.getNumber
case _ => message.getMessageTypeValue
}
messageTypeValue match {
case UNKNOWN_MESSAGE_VALUE =>
val msg = s"received unknown message $message"
logError(msg)
throw new UnsupportedOperationException(msg)
// keep it for compatible reason
case RELEASE_SLOTS_VALUE =>
val pbReleaseSlots = PbReleaseSlots.parseFrom(message.getPayload)
val slotsList = pbReleaseSlots.getSlotsList.asScala.map(pbSlot =>
new util.HashMap[String, Integer](pbSlot.getSlotMap)).toList.asJava
ReleaseSlots(
pbReleaseSlots.getApplicationId,
pbReleaseSlots.getShuffleId,
new util.ArrayList[String](pbReleaseSlots.getWorkerIdsList),
new util.ArrayList[util.Map[String, Integer]](slotsList),
pbReleaseSlots.getRequestId)
case RELEASE_SLOTS_RESPONSE_VALUE =>
val pbReleaseSlotsResponse = PbReleaseSlotsResponse.parseFrom(message.getPayload)
ReleaseSlotsResponse(StatusCode.fromValue(pbReleaseSlotsResponse.getStatus))
case REGISTER_WORKER_VALUE =>
PbRegisterWorker.parseFrom(message.getPayload)
case HEARTBEAT_FROM_WORKER_VALUE =>
val pbHeartbeatFromWorker = PbHeartbeatFromWorker.parseFrom(message.getPayload)
val userResourceConsumption = PbSerDeUtils.fromPbUserResourceConsumption(
pbHeartbeatFromWorker.getUserResourceConsumptionMap)
val pbDisks =
pbHeartbeatFromWorker.getDisksList.asScala.toSeq.map(PbSerDeUtils.fromPbDiskInfo)
val activeShuffleKeys = new util.HashSet[String]()
if (!pbHeartbeatFromWorker.getActiveShuffleKeysList.isEmpty) {
activeShuffleKeys.addAll(pbHeartbeatFromWorker.getActiveShuffleKeysList)
}
val workerStatus = PbSerDeUtils.fromPbWorkerStatus(pbHeartbeatFromWorker.getWorkerStatus)
HeartbeatFromWorker(
pbHeartbeatFromWorker.getHost,
pbHeartbeatFromWorker.getRpcPort,
pbHeartbeatFromWorker.getPushPort,
pbHeartbeatFromWorker.getFetchPort,
pbHeartbeatFromWorker.getReplicatePort,
pbDisks,
userResourceConsumption,
activeShuffleKeys,
pbHeartbeatFromWorker.getHighWorkload,
workerStatus,
pbHeartbeatFromWorker.getRequestId)
case HEARTBEAT_FROM_WORKER_RESPONSE_VALUE =>
val pbHeartbeatFromWorkerResponse =
PbHeartbeatFromWorkerResponse.parseFrom(message.getPayload)
val expiredShuffleKeys = new util.HashSet[String]()
if (pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysCount > 0) {
expiredShuffleKeys.addAll(pbHeartbeatFromWorkerResponse.getExpiredShuffleKeysList)
}
HeartbeatFromWorkerResponse(
expiredShuffleKeys,
pbHeartbeatFromWorkerResponse.getRegistered,
pbHeartbeatFromWorkerResponse.getWorkerEventType)
case REGISTER_SHUFFLE_VALUE =>
PbRegisterShuffle.parseFrom(message.getPayload)
case REGISTER_MAP_PARTITION_TASK_VALUE =>
PbRegisterMapPartitionTask.parseFrom(message.getPayload)
case REGISTER_SHUFFLE_RESPONSE_VALUE =>
PbRegisterShuffleResponse.parseFrom(message.getPayload)
case REQUEST_SLOTS_VALUE =>
val pbRequestSlots = PbRequestSlots.parseFrom(message.getPayload)
val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbRequestSlots.getUserIdentifier)
val excludedWorkerInfoSet =
pbRequestSlots.getExcludedWorkerSetList.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toSet
RequestSlots(
pbRequestSlots.getApplicationId,
pbRequestSlots.getShuffleId,
new util.ArrayList[Integer](pbRequestSlots.getPartitionIdListList),
pbRequestSlots.getHostname,
pbRequestSlots.getShouldReplicate,
pbRequestSlots.getShouldRackAware,
userIdentifier,
pbRequestSlots.getMaxWorkers,
pbRequestSlots.getAvailableStorageTypes,
excludedWorkerInfoSet,
pbRequestSlots.getPacked,
pbRequestSlots.getTagsExpr,
pbRequestSlots.getRequestId)
case REQUEST_SLOTS_RESPONSE_VALUE =>
val pbRequestSlotsResponse = PbRequestSlotsResponse.parseFrom(message.getPayload)
val workerResource =
if (pbRequestSlotsResponse.getWorkerResourceMap.isEmpty) {
PbSerDeUtils.fromPbPackedWorkerResource(
pbRequestSlotsResponse.getPackedWorkerResourceMap)
} else {
PbSerDeUtils.fromPbWorkerResource(
pbRequestSlotsResponse.getWorkerResourceMap)
}
RequestSlotsResponse(
StatusCode.fromValue(pbRequestSlotsResponse.getStatus),
workerResource)
case CHANGE_LOCATION_VALUE =>
PbRevive.parseFrom(message.getPayload)
case CHANGE_LOCATION_RESPONSE_VALUE =>
PbChangeLocationResponse.parseFrom(message.getPayload)
case MAPPER_END_VALUE =>
val pbMapperEnd = PbMapperEnd.parseFrom(message.getPayload)
MapperEnd(
pbMapperEnd.getShuffleId,
pbMapperEnd.getMapId,
pbMapperEnd.getAttemptId,
pbMapperEnd.getNumMappers,
pbMapperEnd.getPartitionId,
pbMapperEnd.getPushFailureBatchesMap.asScala.map {
case (partitionId, pushFailedBatchSet) =>
(partitionId, PbSerDeUtils.fromPbPushFailedBatchSet(pushFailedBatchSet))
}.toMap.asJava)
case MAPPER_END_RESPONSE_VALUE =>
val pbMapperEndResponse = PbMapperEndResponse.parseFrom(message.getPayload)
MapperEndResponse(StatusCode.fromValue(pbMapperEndResponse.getStatus))
case GET_REDUCER_FILE_GROUP_VALUE =>
val pbGetReducerFileGroup = PbGetReducerFileGroup.parseFrom(message.getPayload)
GetReducerFileGroup(
pbGetReducerFileGroup.getShuffleId,
pbGetReducerFileGroup.getIsSegmentGranularityVisible,
message.getSerdeVersion)
case GET_REDUCER_FILE_GROUP_RESPONSE_VALUE =>
val pbGetReducerFileGroupResponse = PbGetReducerFileGroupResponse
.parseFrom(message.getPayload)
val fileGroup = pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
case (partitionId, fileGroup) =>
val locationsSet: java.util.Set[PartitionLocation] =
new util.LinkedHashSet[PartitionLocation]()
// In PbGetReducerFileGroupResponse, location with same
// uniqueId will not be put into the location set
// check out the logic @org.apache.celeborn.client.commit.CommitHandler.parallelCommitFiles
// This is why we should join the primary location list and replica location list
val (pris, reps) = PbSerDeUtils.fromPbPackedPartitionLocationsPair(
fileGroup.getPartitionLocationsPair)
locationsSet.addAll(pris)
locationsSet.addAll(reps)
(
partitionId,
locationsSet)
}.asJava
val attempts = pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray
val partitionIds = new util.HashSet(pbGetReducerFileGroupResponse.getPartitionIdsList)
val pushFailedBatches = pbGetReducerFileGroupResponse.getPushFailedBatchesMap.asScala.map {
case (uniqueId, pushFailedBatchSet) =>
(uniqueId, PbSerDeUtils.fromPbPushFailedBatchSet(pushFailedBatchSet))
}.toMap.asJava
val broadcast = pbGetReducerFileGroupResponse.getBroadcast.toByteArray
GetReducerFileGroupResponse(
StatusCode.fromValue(pbGetReducerFileGroupResponse.getStatus),
fileGroup,
attempts,
partitionIds,
pushFailedBatches,
broadcast)
case GET_SHUFFLE_ID_VALUE =>
message.getParsedPayload()
case GET_SHUFFLE_ID_RESPONSE_VALUE =>
message.getParsedPayload()
case REPORT_SHUFFLE_FETCH_FAILURE_VALUE =>
message.getParsedPayload()
case REPORT_SHUFFLE_FETCH_FAILURE_RESPONSE_VALUE =>
message.getParsedPayload()
case UNREGISTER_SHUFFLE_VALUE =>
PbUnregisterShuffle.parseFrom(message.getPayload)
case BATCH_UNREGISTER_SHUFFLES_VALUE =>
PbBatchUnregisterShuffles.parseFrom(message.getPayload)
case UNREGISTER_SHUFFLE_RESPONSE_VALUE =>
PbUnregisterShuffleResponse.parseFrom(message.getPayload)
case BATCH_UNREGISTER_SHUFFLE_RESPONSE_VALUE =>
PbBatchUnregisterShuffleResponse.parseFrom(message.getPayload)
case APPLICATION_LOST_VALUE =>
val pbApplicationLost = PbApplicationLost.parseFrom(message.getPayload)
ApplicationLost(pbApplicationLost.getAppId, pbApplicationLost.getRequestId)
case APPLICATION_LOST_RESPONSE_VALUE =>
val pbApplicationLostResponse = PbApplicationLostResponse.parseFrom(message.getPayload)
ApplicationLostResponse(StatusCode.fromValue(pbApplicationLostResponse.getStatus))
case HEARTBEAT_FROM_APPLICATION_VALUE =>
val pbHeartbeatFromApplication = PbHeartbeatFromApplication.parseFrom(message.getPayload)
HeartbeatFromApplication(
pbHeartbeatFromApplication.getAppId,
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
pbHeartbeatFromApplication.getShuffleCount,
pbHeartbeatFromApplication.getShuffleFallbackCountsMap,
new util.ArrayList[WorkerInfo](
pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbHeartbeatFromApplication.getRequestId,
pbHeartbeatFromApplication.getShouldResponse)
case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE =>
val pbHeartbeatFromApplicationResponse =
PbHeartbeatFromApplicationResponse.parseFrom(message.getPayload)
val pbCheckQuotaResponse = pbHeartbeatFromApplicationResponse.getCheckQuotaResponse
HeartbeatFromApplicationResponse(
StatusCode.fromValue(pbHeartbeatFromApplicationResponse.getStatus),
pbHeartbeatFromApplicationResponse.getExcludedWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getUnknownWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava,
pbHeartbeatFromApplicationResponse.getRegisteredShufflesList,
CheckQuotaResponse(pbCheckQuotaResponse.getAvailable, pbCheckQuotaResponse.getReason))
case CHECK_QUOTA_VALUE =>
val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload)
CheckQuota(PbSerDeUtils.fromPbUserIdentifier(pbCheckAvailable.getUserIdentifier))
case CHECK_QUOTA_RESPONSE_VALUE =>
val pbCheckAvailableResponse = PbCheckQuotaResponse
.parseFrom(message.getPayload)
CheckQuotaResponse(
pbCheckAvailableResponse.getAvailable,
pbCheckAvailableResponse.getReason)
case REPORT_WORKER_FAILURE_VALUE =>
val pbReportWorkerUnavailable = PbReportWorkerUnavailable.parseFrom(message.getPayload)
ReportWorkerUnavailable(
new util.ArrayList[WorkerInfo](pbReportWorkerUnavailable.getUnavailableList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbReportWorkerUnavailable.getRequestId)
case REPORT_WORKER_DECOMMISSION_VALUE =>
val pbReportWorkerDecommission = PbReportWorkerDecommission.parseFrom(message.getPayload)
ReportWorkerDecommission(
new util.ArrayList[WorkerInfo](pbReportWorkerDecommission.getWorkersList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbReportWorkerDecommission.getRequestId)
case REMOVE_WORKERS_UNAVAILABLE_INFO_VALUE =>
PbRemoveWorkersUnavailableInfo.parseFrom(message.getPayload)
case REGISTER_WORKER_RESPONSE_VALUE =>
PbRegisterWorkerResponse.parseFrom(message.getPayload)
case WORKER_EVENT_REQUEST_VALUE =>
PbWorkerEventRequest.parseFrom(message.getPayload)
case WORKER_EVENT_RESPONSE_VALUE =>
PbWorkerEventResponse.parseFrom(message.getPayload)
case RESERVE_SLOTS_VALUE =>
val pbReserveSlots = PbReserveSlots.parseFrom(message.getPayload)
val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbReserveSlots.getUserIdentifier)
val (primaryLocations, replicateLocations) =
if (pbReserveSlots.getPrimaryLocationsList.isEmpty && pbReserveSlots.getReplicaLocationsList.isEmpty) {
PbSerDeUtils.fromPbPackedPartitionLocationsPair(
pbReserveSlots.getPartitionLocationsPair)
} else {
(
new util.ArrayList[PartitionLocation](pbReserveSlots.getPrimaryLocationsList.asScala
.map(PbSerDeUtils.fromPbPartitionLocation).toList.asJava),
new util.ArrayList[PartitionLocation](pbReserveSlots.getReplicaLocationsList.asScala
.map(PbSerDeUtils.fromPbPartitionLocation).toList.asJava))
}
ReserveSlots(
pbReserveSlots.getApplicationId,
pbReserveSlots.getShuffleId,
primaryLocations,
replicateLocations,
pbReserveSlots.getSplitThreshold,
Utils.toShuffleSplitMode(pbReserveSlots.getSplitMode),
Utils.toPartitionType(pbReserveSlots.getPartitionType),
pbReserveSlots.getRangeReadFilter,
userIdentifier,
pbReserveSlots.getPushDataTimeout,
pbReserveSlots.getPartitionSplitEnabled,
pbReserveSlots.getIsSegmentGranularityVisible)
case RESERVE_SLOTS_RESPONSE_VALUE =>
val pbReserveSlotsResponse = PbReserveSlotsResponse.parseFrom(message.getPayload)
ReserveSlotsResponse(
StatusCode.fromValue(pbReserveSlotsResponse.getStatus),
pbReserveSlotsResponse.getReason)
case COMMIT_FILES_VALUE =>
val pbCommitFiles = PbCommitFiles.parseFrom(message.getPayload)
CommitFiles(
pbCommitFiles.getApplicationId,
pbCommitFiles.getShuffleId,
pbCommitFiles.getPrimaryIdsList,
pbCommitFiles.getReplicaIdsList,
pbCommitFiles.getMapAttemptsList.asScala.map(_.toInt).toArray,
pbCommitFiles.getEpoch,
pbCommitFiles.getMockFailure)
case COMMIT_FILES_RESPONSE_VALUE =>
val pbCommitFilesResponse = PbCommitFilesResponse.parseFrom(message.getPayload)
val committedPrimaryStorageInfos = new util.HashMap[String, StorageInfo]()
val committedReplicaStorageInfos = new util.HashMap[String, StorageInfo]()
val committedBitMap = new util.HashMap[String, RoaringBitmap]()
pbCommitFilesResponse.getCommittedPrimaryStorageInfosMap.asScala.foreach(entry =>
committedPrimaryStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
pbCommitFilesResponse.getCommittedReplicaStorageInfosMap.asScala.foreach(entry =>
committedReplicaStorageInfos.put(entry._1, StorageInfo.fromPb(entry._2)))
pbCommitFilesResponse.getMapIdBitmapMap.asScala.foreach { entry =>
committedBitMap.put(entry._1, Utils.byteStringToRoaringBitmap(entry._2))
}
CommitFilesResponse(
StatusCode.fromValue(pbCommitFilesResponse.getStatus),
pbCommitFilesResponse.getCommittedPrimaryIdsList,
pbCommitFilesResponse.getCommittedReplicaIdsList,
pbCommitFilesResponse.getFailedPrimaryIdsList,
pbCommitFilesResponse.getFailedReplicaIdsList,
committedPrimaryStorageInfos,
committedReplicaStorageInfos,
committedBitMap,
pbCommitFilesResponse.getTotalWritten,
pbCommitFilesResponse.getFileCount)
case DESTROY_VALUE =>
val pbDestroy = PbDestroyWorkerSlots.parseFrom(message.getPayload)
DestroyWorkerSlots(
pbDestroy.getShuffleKey,
pbDestroy.getPrimaryLocationsList,
pbDestroy.getReplicaLocationList,
pbDestroy.getMockFailure)
case DESTROY_RESPONSE_VALUE =>
val pbDestroyResponse = PbDestroyWorkerSlotsResponse.parseFrom(message.getPayload)
DestroyWorkerSlotsResponse(
StatusCode.fromValue(pbDestroyResponse.getStatus),
pbDestroyResponse.getFailedPrimariesList,
pbDestroyResponse.getFailedReplicasList)
case REMOVE_EXPIRED_SHUFFLE_VALUE =>
RemoveExpiredShuffle
case ONE_WAY_MESSAGE_RESPONSE_VALUE =>
OneWayMessageResponse
case CHECK_WORKER_TIMEOUT_VALUE =>
pbCheckForWorkerTimeout
case CHECK_APPLICATION_TIMEOUT_VALUE =>
CheckForApplicationTimeOut
case CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
CheckForDFSExpiredDirsTimeout
case WORKER_LOST_VALUE =>
PbWorkerLost.parseFrom(message.getPayload)
case WORKER_LOST_RESPONSE_VALUE =>
PbWorkerLostResponse.parseFrom(message.getPayload)
case STAGE_END_VALUE =>
val pbStageEnd = PbStageEnd.parseFrom(message.getPayload)
StageEnd(pbStageEnd.getShuffleId)
case PARTITION_SPLIT_VALUE =>
PbPartitionSplit.parseFrom(message.getPayload)
case STAGE_END_RESPONSE_VALUE =>
val pbStageEndResponse = PbStageEndResponse.parseFrom(message.getPayload)
StageEndResponse(StatusCode.fromValue(pbStageEndResponse.getStatus))
case CHECK_WORKERS_AVAILABLE_VALUE =>
PbCheckWorkersAvailable.parseFrom(message.getPayload)
case CHECK_WORKERS_AVAILABLE_RESPONSE_VALUE =>
PbCheckWorkersAvailableResponse.parseFrom(message.getPayload)
case APPLICATION_META_VALUE =>
PbApplicationMeta.parseFrom(message.getPayload)
case APPLICATION_META_REQUEST_VALUE =>
PbApplicationMetaRequest.parseFrom(message.getPayload)
case REPORT_BARRIER_STAGE_ATTEMPT_FAILURE_VALUE =>
PbReportBarrierStageAttemptFailure.parseFrom(message.getPayload)
case REPORT_BARRIER_STAGE_ATTEMPT_FAILURE_RESPONSE_VALUE =>
PbReportBarrierStageAttemptFailureResponse.parseFrom(message.getPayload)
case PUSH_MERGED_DATA_SPLIT_PARTITION_INFO_VALUE =>
PbPushMergedDataSplitPartitionInfo.parseFrom(message.getPayload)
}
}