public ResourceResponse handleWriteRequest()

in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java [87:242]


  public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest request) {
    ResourceProtos.Type cmdType = request.getCmdType();
    ResourceResponse.Builder responseBuilder = getMasterMetaResponseBuilder(request);
    try {
      String shuffleKey;
      String appId;
      String host;
      int rpcPort;
      int pushPort;
      int fetchPort;
      int replicatePort;
      Map<String, DiskInfo> diskInfos;
      Map<UserIdentifier, ResourceConsumption> userResourceConsumption;
      List<Map<String, Integer>> slots = new ArrayList<>();
      Map<String, Long> estimatedAppDiskUsage = new HashMap<>();
      switch (cmdType) {
        case RequestSlots:
          shuffleKey = request.getRequestSlotsRequest().getShuffleKey();
          LOG.debug("Handle request slots for {}", shuffleKey);
          metaSystem.updateRequestSlotsMeta(
              shuffleKey, request.getRequestSlotsRequest().getHostName(), new HashMap<>());
          break;

        case ReleaseSlots:
          break;

        case UnRegisterShuffle:
          shuffleKey = request.getUnregisterShuffleRequest().getShuffleKey();
          LOG.debug("Handle unregister shuffle for {}", shuffleKey);
          metaSystem.updateUnregisterShuffleMeta(shuffleKey);
          break;

        case AppHeartbeat:
          appId = request.getAppHeartbeatRequest().getAppId();
          LOG.debug("Handle app heartbeat for {}", appId);
          long time = request.getAppHeartbeatRequest().getTime();
          long totalWritten = request.getAppHeartbeatRequest().getTotalWritten();
          long fileCount = request.getAppHeartbeatRequest().getFileCount();
          metaSystem.updateAppHeartbeatMeta(appId, time, totalWritten, fileCount);
          break;

        case AppLost:
          appId = request.getAppLostRequest().getAppId();
          LOG.debug("Handle app lost for {}", appId);
          metaSystem.updateAppLostMeta(appId);
          break;

        case WorkerLost:
          host = request.getWorkerLostRequest().getHost();
          rpcPort = request.getWorkerLostRequest().getRpcPort();
          pushPort = request.getWorkerLostRequest().getPushPort();
          fetchPort = request.getWorkerLostRequest().getFetchPort();
          replicatePort = request.getWorkerLostRequest().getReplicatePort();
          LOG.debug("Handle worker lost for {} {}", host, pushPort);
          metaSystem.updateWorkerLostMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
          break;

        case WorkerRemove:
          host = request.getWorkerRemoveRequest().getHost();
          rpcPort = request.getWorkerRemoveRequest().getRpcPort();
          pushPort = request.getWorkerRemoveRequest().getPushPort();
          fetchPort = request.getWorkerRemoveRequest().getFetchPort();
          replicatePort = request.getWorkerRemoveRequest().getReplicatePort();
          LOG.debug("Handle worker remove for {} {}", host, pushPort);
          metaSystem.updateWorkerRemoveMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
          break;

        case WorkerHeartbeat:
          host = request.getWorkerHeartbeatRequest().getHost();
          rpcPort = request.getWorkerHeartbeatRequest().getRpcPort();
          pushPort = request.getWorkerHeartbeatRequest().getPushPort();
          fetchPort = request.getWorkerHeartbeatRequest().getFetchPort();
          diskInfos = MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap());
          userResourceConsumption =
              MetaUtil.fromPbUserResourceConsumption(
                  request.getWorkerHeartbeatRequest().getUserResourceConsumptionMap());
          estimatedAppDiskUsage.putAll(
              request.getWorkerHeartbeatRequest().getEstimatedAppDiskUsageMap());
          replicatePort = request.getWorkerHeartbeatRequest().getReplicatePort();
          LOG.debug(
              "Handle worker heartbeat for {} {} {} {} {} {} {}",
              host,
              rpcPort,
              pushPort,
              fetchPort,
              replicatePort,
              diskInfos,
              userResourceConsumption);
          time = request.getWorkerHeartbeatRequest().getTime();
          metaSystem.updateWorkerHeartbeatMeta(
              host,
              rpcPort,
              pushPort,
              fetchPort,
              replicatePort,
              diskInfos,
              userResourceConsumption,
              estimatedAppDiskUsage,
              time);
          break;

        case RegisterWorker:
          host = request.getRegisterWorkerRequest().getHost();
          rpcPort = request.getRegisterWorkerRequest().getRpcPort();
          pushPort = request.getRegisterWorkerRequest().getPushPort();
          fetchPort = request.getRegisterWorkerRequest().getFetchPort();
          replicatePort = request.getRegisterWorkerRequest().getReplicatePort();
          diskInfos = MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
          userResourceConsumption =
              MetaUtil.fromPbUserResourceConsumption(
                  request.getRegisterWorkerRequest().getUserResourceConsumptionMap());
          LOG.debug(
              "Handle worker register for {} {} {} {} {} {} {}",
              host,
              rpcPort,
              pushPort,
              fetchPort,
              replicatePort,
              diskInfos,
              userResourceConsumption);
          metaSystem.updateRegisterWorkerMeta(
              host,
              rpcPort,
              pushPort,
              fetchPort,
              replicatePort,
              diskInfos,
              userResourceConsumption);
          break;

        case ReportWorkerUnavailable:
          List<ResourceProtos.WorkerAddress> failedAddress =
              request.getReportWorkerUnavailableRequest().getUnavailableList();
          List<WorkerInfo> failedWorkers =
              failedAddress.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
          metaSystem.updateMetaByReportWorkerUnavailable(failedWorkers);
          break;

        case UpdatePartitionSize:
          metaSystem.updatePartitionSize();
          break;

        default:
          throw new IOException("Can not parse this command!" + request);
      }
      responseBuilder.setStatus(ResourceProtos.Status.OK);
    } catch (IOException e) {
      LOG.warn("Handle meta write request " + cmdType + " failed!", e);
      responseBuilder.setSuccess(false);
      responseBuilder.setStatus(ResourceProtos.Status.INTERNAL_ERROR);
      if (e.getMessage() != null) {
        responseBuilder.setMessage(e.getMessage());
      }
    }
    return responseBuilder.build();
  }