in master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala [412:602]
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case HeartbeatFromApplication(
appId,
totalWritten,
fileCount,
shuffleFallbackCount,
shuffleFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse) =>
logDebug(s"Received heartbeat from app $appId")
checkAuth(context, appId)
executeWithLeaderChecker(
context,
handleHeartbeatFromApplication(
context,
appId,
totalWritten,
fileCount,
shuffleFallbackCount,
shuffleFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse))
case pbRegisterWorker: PbRegisterWorker =>
val requestId = pbRegisterWorker.getRequestId
val host = pbRegisterWorker.getHost
val rpcPort = pbRegisterWorker.getRpcPort
val pushPort = pbRegisterWorker.getPushPort
val fetchPort = pbRegisterWorker.getFetchPort
val replicatePort = pbRegisterWorker.getReplicatePort
val internalPort = pbRegisterWorker.getInternalPort
val networkLocation = pbRegisterWorker.getNetworkLocation
val disks = pbRegisterWorker.getDisksList.asScala
.map { pbDiskInfo => pbDiskInfo.getMountPoint -> PbSerDeUtils.fromPbDiskInfo(pbDiskInfo) }
.toMap.asJava
val userResourceConsumption =
PbSerDeUtils.fromPbUserResourceConsumption(pbRegisterWorker.getUserResourceConsumptionMap)
logDebug(s"Received RegisterWorker request $requestId, $host:$pushPort:$replicatePort" +
s" $disks.")
executeWithLeaderChecker(
context,
handleRegisterWorker(
context,
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
internalPort,
networkLocation,
disks,
userResourceConsumption,
requestId))
case ReleaseSlots(_, _, _, _, _) =>
// keep it for compatible reason
context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
checkAuth(context, applicationId)
executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))
case pb: PbBatchUnregisterShuffles =>
val applicationId = pb.getAppId
val shuffleIds = pb.getShuffleIdsList.asScala.toList
val requestId = pb.getRequestId
logDebug(s"Received BatchUnregisterShuffle request $requestId, $applicationId, $shuffleIds")
checkAuth(context, applicationId)
executeWithLeaderChecker(
context,
batchHandleUnregisterShuffles(context, applicationId, shuffleIds, requestId))
case pb: PbUnregisterShuffle =>
val applicationId = pb.getAppId
val shuffleId = pb.getShuffleId
val requestId = pb.getRequestId
logDebug(s"Received UnregisterShuffle request $requestId, $applicationId, $shuffleId")
checkAuth(context, applicationId)
executeWithLeaderChecker(
context,
handleUnregisterShuffle(context, applicationId, shuffleId, requestId))
case ApplicationLost(appId, requestId) =>
logDebug(
s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.")
checkAuth(context, appId)
executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId))
case HeartbeatFromWorker(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
disks,
userResourceConsumption,
activeShuffleKey,
highWorkload,
workerStatus,
requestId) =>
logDebug(s"Received heartbeat from" +
s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with $disks.")
executeWithLeaderChecker(
context,
handleHeartbeatFromWorker(
context,
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
disks,
userResourceConsumption,
activeShuffleKey,
highWorkload,
workerStatus,
requestId))
case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], requestId: String) =>
executeWithLeaderChecker(
context,
handleReportNodeUnavailable(context, failedWorkers, requestId))
case ReportWorkerDecommission(workers: util.List[WorkerInfo], requestId: String) =>
executeWithLeaderChecker(
context,
handleWorkerDecommission(context, workers, requestId))
case pb: PbReviseLostShuffles =>
executeWithLeaderChecker(
context,
handleReviseLostShuffle(context, pb.getAppId, pb.getLostShufflesList, pb.getRequestId))
case pb: PbWorkerExclude =>
val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
val workersToRemove = new util.ArrayList[WorkerInfo](pb.getWorkersToRemoveList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
executeWithLeaderChecker(
context,
handleWorkerExclude(
context,
workersToAdd,
workersToRemove,
pb.getRequestId))
case pb: PbWorkerLost =>
val host = pb.getHost
val rpcPort = pb.getRpcPort
val pushPort = pb.getPushPort
val fetchPort = pb.getFetchPort
val replicatePort = pb.getReplicatePort
val requestId = pb.getRequestId
logInfo(s"Received worker lost $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
executeWithLeaderChecker(
context,
handleWorkerLost(context, host, rpcPort, pushPort, fetchPort, replicatePort, requestId))
case CheckQuota(userIdentifier) =>
executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, context))
case _: PbCheckWorkersAvailable =>
executeWithLeaderChecker(context, handleCheckWorkersAvailable(context))
case pb: PbWorkerEventRequest =>
val workers = new util.ArrayList[WorkerInfo](pb.getWorkersList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
executeWithLeaderChecker(
context,
handleWorkerEvent(
pb.getRequestId,
pb.getWorkerEventType.getNumber,
workers,
context))
case pb: PbApplicationMetaRequest =>
// This request is from a worker
executeWithLeaderChecker(context, handleRequestForApplicationMeta(context, pb))
case pb: PbRemoveWorkersUnavailableInfo =>
val unavailableWorkers = new util.ArrayList[WorkerInfo](pb.getWorkerInfoList
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
executeWithLeaderChecker(
context,
handleRemoveWorkersUnavailableInfos(context, unavailableWorkers, pb.getRequestId))
}