in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java [87:242]
public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest request) {
ResourceProtos.Type cmdType = request.getCmdType();
ResourceResponse.Builder responseBuilder = getMasterMetaResponseBuilder(request);
try {
String shuffleKey;
String appId;
String host;
int rpcPort;
int pushPort;
int fetchPort;
int replicatePort;
Map<String, DiskInfo> diskInfos;
Map<UserIdentifier, ResourceConsumption> userResourceConsumption;
List<Map<String, Integer>> slots = new ArrayList<>();
Map<String, Long> estimatedAppDiskUsage = new HashMap<>();
switch (cmdType) {
case RequestSlots:
shuffleKey = request.getRequestSlotsRequest().getShuffleKey();
LOG.debug("Handle request slots for {}", shuffleKey);
metaSystem.updateRequestSlotsMeta(
shuffleKey, request.getRequestSlotsRequest().getHostName(), new HashMap<>());
break;
case ReleaseSlots:
break;
case UnRegisterShuffle:
shuffleKey = request.getUnregisterShuffleRequest().getShuffleKey();
LOG.debug("Handle unregister shuffle for {}", shuffleKey);
metaSystem.updateUnregisterShuffleMeta(shuffleKey);
break;
case AppHeartbeat:
appId = request.getAppHeartbeatRequest().getAppId();
LOG.debug("Handle app heartbeat for {}", appId);
long time = request.getAppHeartbeatRequest().getTime();
long totalWritten = request.getAppHeartbeatRequest().getTotalWritten();
long fileCount = request.getAppHeartbeatRequest().getFileCount();
metaSystem.updateAppHeartbeatMeta(appId, time, totalWritten, fileCount);
break;
case AppLost:
appId = request.getAppLostRequest().getAppId();
LOG.debug("Handle app lost for {}", appId);
metaSystem.updateAppLostMeta(appId);
break;
case WorkerLost:
host = request.getWorkerLostRequest().getHost();
rpcPort = request.getWorkerLostRequest().getRpcPort();
pushPort = request.getWorkerLostRequest().getPushPort();
fetchPort = request.getWorkerLostRequest().getFetchPort();
replicatePort = request.getWorkerLostRequest().getReplicatePort();
LOG.debug("Handle worker lost for {} {}", host, pushPort);
metaSystem.updateWorkerLostMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
break;
case WorkerRemove:
host = request.getWorkerRemoveRequest().getHost();
rpcPort = request.getWorkerRemoveRequest().getRpcPort();
pushPort = request.getWorkerRemoveRequest().getPushPort();
fetchPort = request.getWorkerRemoveRequest().getFetchPort();
replicatePort = request.getWorkerRemoveRequest().getReplicatePort();
LOG.debug("Handle worker remove for {} {}", host, pushPort);
metaSystem.updateWorkerRemoveMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
break;
case WorkerHeartbeat:
host = request.getWorkerHeartbeatRequest().getHost();
rpcPort = request.getWorkerHeartbeatRequest().getRpcPort();
pushPort = request.getWorkerHeartbeatRequest().getPushPort();
fetchPort = request.getWorkerHeartbeatRequest().getFetchPort();
diskInfos = MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap());
userResourceConsumption =
MetaUtil.fromPbUserResourceConsumption(
request.getWorkerHeartbeatRequest().getUserResourceConsumptionMap());
estimatedAppDiskUsage.putAll(
request.getWorkerHeartbeatRequest().getEstimatedAppDiskUsageMap());
replicatePort = request.getWorkerHeartbeatRequest().getReplicatePort();
LOG.debug(
"Handle worker heartbeat for {} {} {} {} {} {} {}",
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
diskInfos,
userResourceConsumption);
time = request.getWorkerHeartbeatRequest().getTime();
metaSystem.updateWorkerHeartbeatMeta(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
diskInfos,
userResourceConsumption,
estimatedAppDiskUsage,
time);
break;
case RegisterWorker:
host = request.getRegisterWorkerRequest().getHost();
rpcPort = request.getRegisterWorkerRequest().getRpcPort();
pushPort = request.getRegisterWorkerRequest().getPushPort();
fetchPort = request.getRegisterWorkerRequest().getFetchPort();
replicatePort = request.getRegisterWorkerRequest().getReplicatePort();
diskInfos = MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
userResourceConsumption =
MetaUtil.fromPbUserResourceConsumption(
request.getRegisterWorkerRequest().getUserResourceConsumptionMap());
LOG.debug(
"Handle worker register for {} {} {} {} {} {} {}",
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
diskInfos,
userResourceConsumption);
metaSystem.updateRegisterWorkerMeta(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
diskInfos,
userResourceConsumption);
break;
case ReportWorkerUnavailable:
List<ResourceProtos.WorkerAddress> failedAddress =
request.getReportWorkerUnavailableRequest().getUnavailableList();
List<WorkerInfo> failedWorkers =
failedAddress.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateMetaByReportWorkerUnavailable(failedWorkers);
break;
case UpdatePartitionSize:
metaSystem.updatePartitionSize();
break;
default:
throw new IOException("Can not parse this command!" + request);
}
responseBuilder.setStatus(ResourceProtos.Status.OK);
} catch (IOException e) {
LOG.warn("Handle meta write request " + cmdType + " failed!", e);
responseBuilder.setSuccess(false);
responseBuilder.setStatus(ResourceProtos.Status.INTERNAL_ERROR);
if (e.getMessage() != null) {
responseBuilder.setMessage(e.getMessage());
}
}
return responseBuilder.build();
}