in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/DispatcherImpl.java [238:267]
private void reportHealthIssues(Job job, GrpcResponse resp) {
if (!resp.code().isEmpty()) {
return;
}
Status.Code statusCode = resp.status().getCode();
final AtomicReference<KafkaPipelineIssue> issue = new AtomicReference<>();
if (statusCode == UNAVAILABLE
|| statusCode == UNKNOWN
|| statusCode == UNIMPLEMENTED
|| statusCode == INTERNAL) {
issue.set(KafkaPipelineIssue.INVALID_RESPONSE_RECEIVED);
} else if (statusCode == PERMISSION_DENIED || statusCode == UNAUTHENTICATED) {
issue.set(KafkaPipelineIssue.PERMISSION_DENIED);
} else {
LatencyTracker.Stats sample = latencyStatsSampler.get();
if (sample.isMature()) {
if (sample.isMedianLatencyHigh()) {
issue.set(KafkaPipelineIssue.MEDIAN_RPC_LATENCY_HIGH);
} else if (sample.isMaxLatencyHigh()) {
issue.set(KafkaPipelineIssue.MAX_RPC_LATENCY_HIGH);
}
}
}
if (issue.get() != null) {
Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
pipelineStateManager.reportIssue(job, issue.get().getPipelineHealthIssue());
}
}