in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java [91:316]
public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest request) {
ResourceProtos.Type cmdType = request.getCmdType();
ResourceResponse.Builder responseBuilder = getMasterMetaResponseBuilder(request);
try {
String shuffleKey;
String appId;
String host;
int rpcPort;
int pushPort;
int fetchPort;
int replicatePort;
Map<String, DiskInfo> diskInfos;
Map<UserIdentifier, ResourceConsumption> userResourceConsumption;
WorkerStatus workerStatus;
List<Integer> lostShuffles;
switch (cmdType) {
case ReviseLostShuffles:
appId = request.getReviseLostShufflesRequest().getAppId();
lostShuffles = request.getReviseLostShufflesRequest().getLostShufflesList();
LOG.info(
"Handle revise lost shuffles for {} {}", appId, StringUtils.join(lostShuffles, ","));
metaSystem.reviseLostShuffles(appId, lostShuffles);
break;
case RequestSlots:
shuffleKey = request.getRequestSlotsRequest().getShuffleKey();
LOG.debug("Handle request slots for {}", shuffleKey);
metaSystem.updateRequestSlotsMeta(
shuffleKey, request.getRequestSlotsRequest().getHostName(), new HashMap<>());
break;
case ReleaseSlots:
break;
case UnRegisterShuffle:
shuffleKey = request.getUnregisterShuffleRequest().getShuffleKey();
LOG.debug("Handle unregister shuffle for {}", shuffleKey);
metaSystem.updateUnregisterShuffleMeta(shuffleKey);
break;
case BatchUnRegisterShuffle:
List<String> shuffleKeys =
request.getBatchUnregisterShuffleRequest().getShuffleKeysList();
metaSystem.updateBatchUnregisterShuffleMeta(shuffleKeys);
LOG.debug("Handle batch unregister shuffle for {}", shuffleKeys);
break;
case AppHeartbeat:
appId = request.getAppHeartbeatRequest().getAppId();
long time = request.getAppHeartbeatRequest().getTime();
long totalWritten = request.getAppHeartbeatRequest().getTotalWritten();
long fileCount = request.getAppHeartbeatRequest().getFileCount();
long shuffleCount = request.getAppHeartbeatRequest().getShuffleCount();
LOG.debug("Handle app heartbeat for {} with shuffle count {}", appId, shuffleCount);
Map<String, Long> shuffleFallbackCounts =
request.getAppHeartbeatRequest().getShuffleFallbackCountsMap();
if (CollectionUtils.isNotEmpty(shuffleFallbackCounts)) {
LOG.warn(
"{} shuffle fallbacks in app {}",
shuffleFallbackCounts.values().stream().mapToLong(v -> v).sum(),
appId);
}
metaSystem.updateAppHeartbeatMeta(
appId, time, totalWritten, fileCount, shuffleCount, shuffleFallbackCounts);
break;
case AppLost:
appId = request.getAppLostRequest().getAppId();
LOG.debug("Handle app lost for {}", appId);
metaSystem.updateAppLostMeta(appId);
break;
case WorkerExclude:
List<ResourceProtos.WorkerAddress> addAddresses =
request.getWorkerExcludeRequest().getWorkersToAddList();
List<ResourceProtos.WorkerAddress> removeAddresses =
request.getWorkerExcludeRequest().getWorkersToRemoveList();
List<WorkerInfo> workersToAdd =
addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
List<WorkerInfo> workersToRemove =
removeAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateManuallyExcludedWorkersMeta(workersToAdd, workersToRemove);
break;
case WorkerLost:
host = request.getWorkerLostRequest().getHost();
rpcPort = request.getWorkerLostRequest().getRpcPort();
pushPort = request.getWorkerLostRequest().getPushPort();
fetchPort = request.getWorkerLostRequest().getFetchPort();
replicatePort = request.getWorkerLostRequest().getReplicatePort();
LOG.debug("Handle worker lost for {} {}", host, pushPort);
metaSystem.updateWorkerLostMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
break;
case WorkerRemove:
// TODO: Remove `WorkerRemove` in 0.7.x version to guarantee upgrade compatibility.
host = request.getWorkerRemoveRequest().getHost();
rpcPort = request.getWorkerRemoveRequest().getRpcPort();
pushPort = request.getWorkerRemoveRequest().getPushPort();
fetchPort = request.getWorkerRemoveRequest().getFetchPort();
replicatePort = request.getWorkerRemoveRequest().getReplicatePort();
LOG.debug("Handle worker remove for {} {}", host, pushPort);
metaSystem.updateWorkerRemoveMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
break;
case WorkerHeartbeat:
host = request.getWorkerHeartbeatRequest().getHost();
rpcPort = request.getWorkerHeartbeatRequest().getRpcPort();
pushPort = request.getWorkerHeartbeatRequest().getPushPort();
fetchPort = request.getWorkerHeartbeatRequest().getFetchPort();
diskInfos = MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap());
replicatePort = request.getWorkerHeartbeatRequest().getReplicatePort();
boolean highWorkload = request.getWorkerHeartbeatRequest().getHighWorkload();
if (request.getWorkerHeartbeatRequest().hasWorkerStatus()) {
workerStatus =
MetaUtil.fromPbWorkerStatus(request.getWorkerHeartbeatRequest().getWorkerStatus());
} else {
workerStatus = WorkerStatus.normalWorkerStatus();
}
LOG.debug(
"Handle worker heartbeat for {} {} {} {} {} {}",
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
diskInfos);
metaSystem.updateWorkerHeartbeatMeta(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
diskInfos,
request.getWorkerHeartbeatRequest().getTime(),
workerStatus,
highWorkload);
break;
case RegisterWorker:
host = request.getRegisterWorkerRequest().getHost();
rpcPort = request.getRegisterWorkerRequest().getRpcPort();
pushPort = request.getRegisterWorkerRequest().getPushPort();
fetchPort = request.getRegisterWorkerRequest().getFetchPort();
replicatePort = request.getRegisterWorkerRequest().getReplicatePort();
String networkLocation = request.getRegisterWorkerRequest().getNetworkLocation();
int internalPort = request.getRegisterWorkerRequest().getInternalPort();
diskInfos = MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
LOG.debug(
"Handle worker register for {} {} {} {} {} {} {}",
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
internalPort,
diskInfos);
metaSystem.updateRegisterWorkerMeta(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
internalPort,
networkLocation,
diskInfos);
break;
case ReportWorkerUnavailable:
List<ResourceProtos.WorkerAddress> failedAddress =
request.getReportWorkerUnavailableRequest().getUnavailableList();
List<WorkerInfo> failedWorkers =
failedAddress.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateMetaByReportWorkerUnavailable(failedWorkers);
break;
case UpdatePartitionSize:
metaSystem.updatePartitionSize();
break;
case RemoveWorkersUnavailableInfo:
List<ResourceProtos.WorkerAddress> unavailableList =
request.getRemoveWorkersUnavailableInfoRequest().getUnavailableList();
List<WorkerInfo> unavailableWorkers =
unavailableList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.removeWorkersUnavailableInfoMeta(unavailableWorkers);
break;
case WorkerEvent:
List<ResourceProtos.WorkerAddress> workerAddresses =
request.getWorkerEventRequest().getWorkerAddressList();
List<WorkerInfo> workerInfoList =
workerAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateWorkerEventMeta(
request.getWorkerEventRequest().getWorkerEventType().getNumber(), workerInfoList);
break;
case ApplicationMeta:
appId = request.getApplicationMetaRequest().getAppId();
String secret = request.getApplicationMetaRequest().getSecret();
metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret));
break;
case ReportWorkerDecommission:
List<ResourceProtos.WorkerAddress> decommissionList =
request.getReportWorkerDecommissionRequest().getWorkersList();
List<WorkerInfo> decommissionWorkers =
decommissionList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
metaSystem.updateMetaByReportWorkerDecommission(decommissionWorkers);
break;
default:
throw new IOException("Can not parse this command!" + request);
}
responseBuilder.setStatus(ResourceProtos.Status.OK);
} catch (IOException e) {
LOG.warn("Handle meta write request {} failed!", cmdType, e);
responseBuilder.setSuccess(false);
responseBuilder.setStatus(ResourceProtos.Status.INTERNAL_ERROR);
if (e.getMessage() != null) {
responseBuilder.setMessage(e.getMessage());
}
}
return responseBuilder.build();
}