in master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala [258:370]
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case HeartbeatFromApplication(
appId,
totalWritten,
fileCount,
needCheckedWorkerList,
requestId,
shouldResponse) =>
logDebug(s"Received heartbeat from app $appId")
executeWithLeaderChecker(
context,
handleHeartbeatFromApplication(
context,
appId,
totalWritten,
fileCount,
needCheckedWorkerList,
requestId,
shouldResponse))
case pbRegisterWorker: PbRegisterWorker =>
val requestId = pbRegisterWorker.getRequestId
val host = pbRegisterWorker.getHost
val rpcPort = pbRegisterWorker.getRpcPort
val pushPort = pbRegisterWorker.getPushPort
val fetchPort = pbRegisterWorker.getFetchPort
val replicatePort = pbRegisterWorker.getReplicatePort
val disks = pbRegisterWorker.getDisksList.asScala
.map { pbDiskInfo => pbDiskInfo.getMountPoint -> PbSerDeUtils.fromPbDiskInfo(pbDiskInfo) }
.toMap.asJava
val userResourceConsumption =
PbSerDeUtils.fromPbUserResourceConsumption(pbRegisterWorker.getUserResourceConsumptionMap)
logDebug(s"Received RegisterWorker request $requestId, $host:$pushPort:$replicatePort" +
s" $disks.")
executeWithLeaderChecker(
context,
handleRegisterWorker(
context,
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
disks,
userResourceConsumption,
requestId))
case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))
case pb: PbUnregisterShuffle =>
val applicationId = pb.getAppId
val shuffleId = pb.getShuffleId
val requestId = pb.getRequestId
logDebug(s"Received UnregisterShuffle request $requestId, $applicationId, $shuffleId")
executeWithLeaderChecker(
context,
handleUnregisterShuffle(context, applicationId, shuffleId, requestId))
case ApplicationLost(appId, requestId) =>
logDebug(s"Received ApplicationLost request $requestId, $appId.")
executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId))
case HeartbeatFromWorker(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
disks,
userResourceConsumption,
activeShuffleKey,
estimatedAppDiskUsage,
requestId) =>
logDebug(s"Received heartbeat from" +
s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with $disks.")
executeWithLeaderChecker(
context,
handleHeartbeatFromWorker(
context,
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
disks,
userResourceConsumption,
activeShuffleKey,
estimatedAppDiskUsage,
requestId))
case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], requestId: String) =>
executeWithLeaderChecker(
context,
handleReportNodeUnavailable(context, failedWorkers, requestId))
case pb: PbWorkerLost =>
val host = pb.getHost
val rpcPort = pb.getRpcPort
val pushPort = pb.getPushPort
val fetchPort = pb.getFetchPort
val replicatePort = pb.getReplicatePort
val requestId = pb.getRequestId
logInfo(s"Received worker lost $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
executeWithLeaderChecker(
context,
handleWorkerLost(context, host, rpcPort, pushPort, fetchPort, replicatePort, requestId))
case CheckQuota(userIdentifier) =>
executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, context))
}