in tchannel-core/src/main/java/com/uber/tchannel/handlers/RequestRouter.java [87:193]
protected void channelRead0(final ChannelHandlerContext ctx, final Request request) {
// There is nothing to do if the connection is already destroyed.
if (!ctx.channel().isActive()) {
request.release();
logger.warn("drop request when channel is inActive");
return;
}
if (request.getArgScheme() == null) {
sendError(ErrorType.BadRequest, "Expected incoming call to have \"as\" header set", request, ctx);
return;
}
final String service = request.getService();
if (service == null || service.isEmpty()) {
sendError(ErrorType.BadRequest, "Expected incoming call to have serviceName", request, ctx);
return;
}
// Get the endpoint. The assumption over here is that endpoints are
// always going to to utf-8 encoded.
String endpoint = request.getEndpoint();
if (endpoint == null || endpoint.isEmpty()) {
sendError(ErrorType.BadRequest, "Expected incoming call to have endpoint", request, ctx);
return;
}
// Get handler for this method
RequestHandler handler = this.getRequestHandler(service, endpoint);
if (handler == null) {
handler = topChannel.getDefaultUserHandler();
}
if (handler == null) {
sendError(
ErrorType.BadRequest,
"No handler function for service:endpoint=" + service + ':' + endpoint,
request,
ctx
);
return;
}
ListenableFuture<? extends Response> responseFuture;
try {
// In case of an AsyncRequestHandler there's no need to submit a task on the executor.
// It does require a down-cast to AsyncRequestHandler.
responseFuture = handler instanceof AsyncRequestHandler
? sendRequestToAsyncHandler((AsyncRequestHandler) handler, request)
: listeningExecutorService.submit(new CallableHandler(handler, topChannel, request));
} catch (Throwable re) {
request.releaseQuietly();
responseFuture = Futures.immediateFailedFuture(re);
}
Futures.addCallback(responseFuture, new FutureCallback<Response>() {
@Override
public void onSuccess(Response response) {
if (ctx.channel().isActive()) {
responseQueue.offer(response);
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
sendResponse(ctx);
}
});
} else {
response.release();
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
logger.error("Failed to handle the request due to exception.", throwable);
ErrorType errorType = null;
String message = null;
if (throwable instanceof ProtocolError) {
ProtocolError protocolError = (ProtocolError) throwable;
errorType = protocolError.getErrorType();
message = protocolError.getMessage();
} else {
Throwable cause = throwable.getCause();
while (cause != null) {
if (cause instanceof ProtocolError) {
ProtocolError protocolError = (ProtocolError) cause;
errorType = protocolError.getErrorType();
message = protocolError.getMessage();
break;
}
cause = cause.getCause();
}
}
if (errorType == null) {
errorType = ErrorType.UnexpectedError;
}
if (message == null) {
message = "Failed to handle the request: " + throwable.getMessage();
}
sendError(errorType, message, request, ctx);
}
}, listeningExecutorService);
}