private void reportHealthIssues()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/DispatcherImpl.java [238:267]


  private void reportHealthIssues(Job job, GrpcResponse resp) {
    if (!resp.code().isEmpty()) {
      return;
    }
    Status.Code statusCode = resp.status().getCode();
    final AtomicReference<KafkaPipelineIssue> issue = new AtomicReference<>();

    if (statusCode == UNAVAILABLE
        || statusCode == UNKNOWN
        || statusCode == UNIMPLEMENTED
        || statusCode == INTERNAL) {
      issue.set(KafkaPipelineIssue.INVALID_RESPONSE_RECEIVED);
    } else if (statusCode == PERMISSION_DENIED || statusCode == UNAUTHENTICATED) {
      issue.set(KafkaPipelineIssue.PERMISSION_DENIED);
    } else {
      LatencyTracker.Stats sample = latencyStatsSampler.get();
      if (sample.isMature()) {
        if (sample.isMedianLatencyHigh()) {
          issue.set(KafkaPipelineIssue.MEDIAN_RPC_LATENCY_HIGH);
        } else if (sample.isMaxLatencyHigh()) {
          issue.set(KafkaPipelineIssue.MAX_RPC_LATENCY_HIGH);
        }
      }
    }

    if (issue.get() != null) {
      Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
      pipelineStateManager.reportIssue(job, issue.get().getPipelineHealthIssue());
    }
  }