in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/rpc/ControllerAdminService.java [295:348]
public void getClusterScaleStatus(
com.uber.data.kafka.datatransfer.GetClusterScaleStatusRequest request,
StreamObserver<GetClusterScaleStatusResponse> responseObserver) {
Instrumentation.instrument.withStreamObserver(
logger,
infra.scope(),
infra.tracer(),
(req, resW) -> {
try {
long messagesPerSecPerWorker = autoScalarConfiguration.getMessagesPerSecPerWorker();
long bytesPerSecPerWorker = autoScalarConfiguration.getBytesPerSecPerWorker();
Map<Long, Versioned<StoredWorker>> allWorkers =
workerStore.getAll(worker -> worker.getState() == WorkerState.WORKER_STATE_WORKING);
Collection<Versioned<StoredJobGroup>> allJobGroups =
jobGroupStore
.getAll(jobGroup -> jobGroup.getState().equals(JobState.JOB_STATE_RUNNING))
.values();
double totalUsedBytesPerSec =
allJobGroups.stream()
.mapToDouble(
jobGroup -> jobGroup.model().getScaleStatus().getTotalBytesPerSec())
.sum();
double totalUsedMessagesPerSec =
allJobGroups.stream()
.mapToDouble(
jobGroup -> jobGroup.model().getScaleStatus().getTotalMessagesPerSec())
.map(qps -> qps > messagesPerSecPerWorker ? messagesPerSecPerWorker : qps)
.sum();
GetClusterScaleStatusResponse response =
GetClusterScaleStatusResponse.newBuilder()
.setTotalJobgroupCount(allJobGroups.size())
.setTotalWorkerCount(allWorkers.size())
.setTotalAllowedMessagesPerSec(
allWorkers.size() * (double) messagesPerSecPerWorker)
.setTotalAllowedBytesPerSec(allWorkers.size() * (double) bytesPerSecPerWorker)
.setTotalUsedBytesPerSec(totalUsedBytesPerSec)
.setTotalUsedMessagesPerSec(totalUsedMessagesPerSec)
.build();
resW.onNext(response);
resW.onCompleted();
} catch (Exception e) {
logger.error("Failed to get cluster scale status", e);
resW.onError(
Status.INTERNAL
.withDescription("Failed to get cluster scale status")
.withCause(e)
.asRuntimeException());
}
},
request,
responseObserver,
"clusterscalestatusservice.getclusterscalestatus");
}