public String read()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/management/WorkersJson.java [36:98]


  public String read() throws Exception {
    Map<String, Versioned<StoredJobGroup>> jobGroups = jobGroupStore.getAll();

    // count jobs by worker_id
    Map<Long, Long> jobCountByWorkerId =
        jobGroups
            .values()
            .stream()
            .flatMap(jg -> jg.model().getJobsList().stream())
            .map(j -> j.getWorkerId())
            .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

    Map<Long, Double> totalLoadByWorkerId =
        jobGroups
            .values()
            .stream()
            .flatMap(jg -> jg.model().getJobsList().stream())
            .collect(
                Collectors.groupingBy(
                    j -> j.getWorkerId(), Collectors.summingDouble(j -> j.getScale())));

    Map<Long, String> workerId2RoutingDest = new HashMap<>();
    jobGroups
        .values()
        .forEach(
            jobGroup ->
                jobGroup
                    .model()
                    .getJobsList()
                    .forEach(
                        storedJob -> {
                          // as of 09/21/2020, one worker only serves on routing destination
                          if (!workerId2RoutingDest.containsKey(storedJob.getWorkerId())) {
                            workerId2RoutingDest.put(
                                storedJob.getWorkerId(),
                                jobGroup
                                    .model()
                                    .getJobGroup()
                                    .getRpcDispatcherTaskGroup()
                                    .getUri());
                          }
                        }));

    // build debug table
    DebugWorkersTable.Builder tableBuilder = DebugWorkersTable.newBuilder();
    for (Map.Entry<Long, Versioned<StoredWorker>> workerEntry : workerStore.getAll().entrySet()) {
      // build debug row
      String hostPort =
          String.format(
              "%s:%d",
              workerEntry.getValue().model().getNode().getHost(),
              workerEntry.getValue().model().getNode().getPort());
      DebugWorkerRow.Builder rowBuilder = DebugWorkerRow.newBuilder();
      rowBuilder.setWorker(workerEntry.getValue().model());
      rowBuilder.setExpectedJobCount(jobCountByWorkerId.getOrDefault(workerEntry.getKey(), 0L));
      rowBuilder.setUrl(workerUrlResolver.resolveLink(workerEntry.getValue().model().getNode()));
      rowBuilder.setRoutingDestination(workerId2RoutingDest.getOrDefault(workerEntry.getKey(), ""));
      rowBuilder.setTotalLoad(totalLoadByWorkerId.getOrDefault(workerEntry.getKey(), 0.0));
      tableBuilder.addData(rowBuilder.build());
    }

    return jsonPrinter.print(tableBuilder.build());
  }