public ResourceResponse handleWriteRequest()

in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java [91:316]


  public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest request) {
    ResourceProtos.Type cmdType = request.getCmdType();
    ResourceResponse.Builder responseBuilder = getMasterMetaResponseBuilder(request);
    try {
      String shuffleKey;
      String appId;
      String host;
      int rpcPort;
      int pushPort;
      int fetchPort;
      int replicatePort;
      Map<String, DiskInfo> diskInfos;
      Map<UserIdentifier, ResourceConsumption> userResourceConsumption;
      WorkerStatus workerStatus;
      List<Integer> lostShuffles;
      switch (cmdType) {
        case ReviseLostShuffles:
          appId = request.getReviseLostShufflesRequest().getAppId();
          lostShuffles = request.getReviseLostShufflesRequest().getLostShufflesList();
          LOG.info(
              "Handle revise lost shuffles for {} {}", appId, StringUtils.join(lostShuffles, ","));
          metaSystem.reviseLostShuffles(appId, lostShuffles);
          break;

        case RequestSlots:
          shuffleKey = request.getRequestSlotsRequest().getShuffleKey();
          LOG.debug("Handle request slots for {}", shuffleKey);
          metaSystem.updateRequestSlotsMeta(
              shuffleKey, request.getRequestSlotsRequest().getHostName(), new HashMap<>());
          break;

        case ReleaseSlots:
          break;

        case UnRegisterShuffle:
          shuffleKey = request.getUnregisterShuffleRequest().getShuffleKey();
          LOG.debug("Handle unregister shuffle for {}", shuffleKey);
          metaSystem.updateUnregisterShuffleMeta(shuffleKey);
          break;

        case BatchUnRegisterShuffle:
          List<String> shuffleKeys =
              request.getBatchUnregisterShuffleRequest().getShuffleKeysList();
          metaSystem.updateBatchUnregisterShuffleMeta(shuffleKeys);
          LOG.debug("Handle batch unregister shuffle for {}", shuffleKeys);
          break;

        case AppHeartbeat:
          appId = request.getAppHeartbeatRequest().getAppId();
          long time = request.getAppHeartbeatRequest().getTime();
          long totalWritten = request.getAppHeartbeatRequest().getTotalWritten();
          long fileCount = request.getAppHeartbeatRequest().getFileCount();
          long shuffleCount = request.getAppHeartbeatRequest().getShuffleCount();
          LOG.debug("Handle app heartbeat for {} with shuffle count {}", appId, shuffleCount);
          Map<String, Long> shuffleFallbackCounts =
              request.getAppHeartbeatRequest().getShuffleFallbackCountsMap();
          if (CollectionUtils.isNotEmpty(shuffleFallbackCounts)) {
            LOG.warn(
                "{} shuffle fallbacks in app {}",
                shuffleFallbackCounts.values().stream().mapToLong(v -> v).sum(),
                appId);
          }
          metaSystem.updateAppHeartbeatMeta(
              appId, time, totalWritten, fileCount, shuffleCount, shuffleFallbackCounts);
          break;

        case AppLost:
          appId = request.getAppLostRequest().getAppId();
          LOG.debug("Handle app lost for {}", appId);
          metaSystem.updateAppLostMeta(appId);
          break;

        case WorkerExclude:
          List<ResourceProtos.WorkerAddress> addAddresses =
              request.getWorkerExcludeRequest().getWorkersToAddList();
          List<ResourceProtos.WorkerAddress> removeAddresses =
              request.getWorkerExcludeRequest().getWorkersToRemoveList();
          List<WorkerInfo> workersToAdd =
              addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
          List<WorkerInfo> workersToRemove =
              removeAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
          metaSystem.updateManuallyExcludedWorkersMeta(workersToAdd, workersToRemove);
          break;

        case WorkerLost:
          host = request.getWorkerLostRequest().getHost();
          rpcPort = request.getWorkerLostRequest().getRpcPort();
          pushPort = request.getWorkerLostRequest().getPushPort();
          fetchPort = request.getWorkerLostRequest().getFetchPort();
          replicatePort = request.getWorkerLostRequest().getReplicatePort();
          LOG.debug("Handle worker lost for {} {}", host, pushPort);
          metaSystem.updateWorkerLostMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
          break;

        case WorkerRemove:
          // TODO: Remove `WorkerRemove` in 0.7.x version to guarantee upgrade compatibility.
          host = request.getWorkerRemoveRequest().getHost();
          rpcPort = request.getWorkerRemoveRequest().getRpcPort();
          pushPort = request.getWorkerRemoveRequest().getPushPort();
          fetchPort = request.getWorkerRemoveRequest().getFetchPort();
          replicatePort = request.getWorkerRemoveRequest().getReplicatePort();
          LOG.debug("Handle worker remove for {} {}", host, pushPort);
          metaSystem.updateWorkerRemoveMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
          break;

        case WorkerHeartbeat:
          host = request.getWorkerHeartbeatRequest().getHost();
          rpcPort = request.getWorkerHeartbeatRequest().getRpcPort();
          pushPort = request.getWorkerHeartbeatRequest().getPushPort();
          fetchPort = request.getWorkerHeartbeatRequest().getFetchPort();
          diskInfos = MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap());
          replicatePort = request.getWorkerHeartbeatRequest().getReplicatePort();
          boolean highWorkload = request.getWorkerHeartbeatRequest().getHighWorkload();
          if (request.getWorkerHeartbeatRequest().hasWorkerStatus()) {
            workerStatus =
                MetaUtil.fromPbWorkerStatus(request.getWorkerHeartbeatRequest().getWorkerStatus());
          } else {
            workerStatus = WorkerStatus.normalWorkerStatus();
          }

          LOG.debug(
              "Handle worker heartbeat for {} {} {} {} {} {}",
              host,
              rpcPort,
              pushPort,
              fetchPort,
              replicatePort,
              diskInfos);
          metaSystem.updateWorkerHeartbeatMeta(
              host,
              rpcPort,
              pushPort,
              fetchPort,
              replicatePort,
              diskInfos,
              request.getWorkerHeartbeatRequest().getTime(),
              workerStatus,
              highWorkload);
          break;

        case RegisterWorker:
          host = request.getRegisterWorkerRequest().getHost();
          rpcPort = request.getRegisterWorkerRequest().getRpcPort();
          pushPort = request.getRegisterWorkerRequest().getPushPort();
          fetchPort = request.getRegisterWorkerRequest().getFetchPort();
          replicatePort = request.getRegisterWorkerRequest().getReplicatePort();
          String networkLocation = request.getRegisterWorkerRequest().getNetworkLocation();
          int internalPort = request.getRegisterWorkerRequest().getInternalPort();
          diskInfos = MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
          LOG.debug(
              "Handle worker register for {} {} {} {} {} {} {}",
              host,
              rpcPort,
              pushPort,
              fetchPort,
              replicatePort,
              internalPort,
              diskInfos);
          metaSystem.updateRegisterWorkerMeta(
              host,
              rpcPort,
              pushPort,
              fetchPort,
              replicatePort,
              internalPort,
              networkLocation,
              diskInfos);
          break;

        case ReportWorkerUnavailable:
          List<ResourceProtos.WorkerAddress> failedAddress =
              request.getReportWorkerUnavailableRequest().getUnavailableList();
          List<WorkerInfo> failedWorkers =
              failedAddress.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
          metaSystem.updateMetaByReportWorkerUnavailable(failedWorkers);
          break;

        case UpdatePartitionSize:
          metaSystem.updatePartitionSize();
          break;

        case RemoveWorkersUnavailableInfo:
          List<ResourceProtos.WorkerAddress> unavailableList =
              request.getRemoveWorkersUnavailableInfoRequest().getUnavailableList();
          List<WorkerInfo> unavailableWorkers =
              unavailableList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
          metaSystem.removeWorkersUnavailableInfoMeta(unavailableWorkers);
          break;

        case WorkerEvent:
          List<ResourceProtos.WorkerAddress> workerAddresses =
              request.getWorkerEventRequest().getWorkerAddressList();
          List<WorkerInfo> workerInfoList =
              workerAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
          metaSystem.updateWorkerEventMeta(
              request.getWorkerEventRequest().getWorkerEventType().getNumber(), workerInfoList);
          break;

        case ApplicationMeta:
          appId = request.getApplicationMetaRequest().getAppId();
          String secret = request.getApplicationMetaRequest().getSecret();
          metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret));
          break;

        case ReportWorkerDecommission:
          List<ResourceProtos.WorkerAddress> decommissionList =
              request.getReportWorkerDecommissionRequest().getWorkersList();
          List<WorkerInfo> decommissionWorkers =
              decommissionList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
          metaSystem.updateMetaByReportWorkerDecommission(decommissionWorkers);
          break;

        default:
          throw new IOException("Can not parse this command!" + request);
      }
      responseBuilder.setStatus(ResourceProtos.Status.OK);
    } catch (IOException e) {
      LOG.warn("Handle meta write request {} failed!", cmdType, e);
      responseBuilder.setSuccess(false);
      responseBuilder.setStatus(ResourceProtos.Status.INTERNAL_ERROR);
      if (e.getMessage() != null) {
        responseBuilder.setMessage(e.getMessage());
      }
    }
    return responseBuilder.build();
  }