in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/DispatcherImpl.java [42:100]
public static DispatcherResponse dispatcherResponseFromGrpcStatus(GrpcResponse resp) {
if (resp.code().isPresent()) {
return new DispatcherResponse(resp.code().get());
}
switch (resp.status().getCode()) {
// COMMIT
case OK:
// Handle OK responses uniformly as COMMIT.
// Practically, onCompleted should be invoked on OK instead of onError.
// gRPC Status OK -> Kafka COMMIT is part of the API contract.
return new DispatcherResponse(DispatcherResponse.Code.COMMIT);
// SKIP
case ALREADY_EXISTS:
return new DispatcherResponse(DispatcherResponse.Code.SKIP);
// RETRY
case RESOURCE_EXHAUSTED:
// gRPC Status RESOURCE_EXHAUSTED -> Kafka RETRY is part of the API contract.
return new DispatcherResponse(DispatcherResponse.Code.RETRY);
// DLQ
case NOT_FOUND:
case INVALID_ARGUMENT:
case FAILED_PRECONDITION:
// gRPC Status FAILED_PRECONDITION -> Kafka DLQ is part of the API contract.
case ABORTED:
case OUT_OF_RANGE:
case DATA_LOSS:
return new DispatcherResponse(DispatcherResponse.Code.DLQ);
case UNAVAILABLE:
if (resp.isOverDue()) {
// some reverse proxy (include envoy and muttley) convert timeout error into
// gRPC code UNAVAILABLE. see
// https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
return new DispatcherResponse(DispatcherResponse.Code.BACKOFF);
} else {
// Do not backoff, if deadline is not exceeded
return new DispatcherResponse(DispatcherResponse.Code.INVALID);
}
case DEADLINE_EXCEEDED:
return new DispatcherResponse(DispatcherResponse.Code.BACKOFF);
// INVALID
case UNKNOWN: // Log detailed response to debug for UNKNOWN code
LOGGER.debug("Dispatcher response with code UNKNOWN: {}", resp);
return new DispatcherResponse(DispatcherResponse.Code.INVALID);
// INVALID
case CANCELLED:
case UNIMPLEMENTED:
case INTERNAL:
// gRPC status PERMISSION_DENIED -> returned when provided SPIFFE ids dont have
// authorization
// to consume
case PERMISSION_DENIED:
// gRPC status UNAUTHETICATED -> returned when provided SPIFFE ids dont match with muttley
// uri
case UNAUTHENTICATED:
default:
return new DispatcherResponse(DispatcherResponse.Code.INVALID);
}
}