public void getClusterScaleStatus()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/rpc/ControllerAdminService.java [295:348]


  public void getClusterScaleStatus(
      com.uber.data.kafka.datatransfer.GetClusterScaleStatusRequest request,
      StreamObserver<GetClusterScaleStatusResponse> responseObserver) {
    Instrumentation.instrument.withStreamObserver(
        logger,
        infra.scope(),
        infra.tracer(),
        (req, resW) -> {
          try {
            long messagesPerSecPerWorker = autoScalarConfiguration.getMessagesPerSecPerWorker();
            long bytesPerSecPerWorker = autoScalarConfiguration.getBytesPerSecPerWorker();
            Map<Long, Versioned<StoredWorker>> allWorkers =
                workerStore.getAll(worker -> worker.getState() == WorkerState.WORKER_STATE_WORKING);
            Collection<Versioned<StoredJobGroup>> allJobGroups =
                jobGroupStore
                    .getAll(jobGroup -> jobGroup.getState().equals(JobState.JOB_STATE_RUNNING))
                    .values();
            double totalUsedBytesPerSec =
                allJobGroups.stream()
                    .mapToDouble(
                        jobGroup -> jobGroup.model().getScaleStatus().getTotalBytesPerSec())
                    .sum();
            double totalUsedMessagesPerSec =
                allJobGroups.stream()
                    .mapToDouble(
                        jobGroup -> jobGroup.model().getScaleStatus().getTotalMessagesPerSec())
                    .map(qps -> qps > messagesPerSecPerWorker ? messagesPerSecPerWorker : qps)
                    .sum();

            GetClusterScaleStatusResponse response =
                GetClusterScaleStatusResponse.newBuilder()
                    .setTotalJobgroupCount(allJobGroups.size())
                    .setTotalWorkerCount(allWorkers.size())
                    .setTotalAllowedMessagesPerSec(
                        allWorkers.size() * (double) messagesPerSecPerWorker)
                    .setTotalAllowedBytesPerSec(allWorkers.size() * (double) bytesPerSecPerWorker)
                    .setTotalUsedBytesPerSec(totalUsedBytesPerSec)
                    .setTotalUsedMessagesPerSec(totalUsedMessagesPerSec)
                    .build();
            resW.onNext(response);
            resW.onCompleted();
          } catch (Exception e) {
            logger.error("Failed to get cluster scale status", e);
            resW.onError(
                Status.INTERNAL
                    .withDescription("Failed to get cluster scale status")
                    .withCause(e)
                    .asRuntimeException());
          }
        },
        request,
        responseObserver,
        "clusterscalestatusservice.getclusterscalestatus");
  }