protected void channelRead0()

in tchannel-core/src/main/java/com/uber/tchannel/handlers/RequestRouter.java [87:193]


    protected void channelRead0(final ChannelHandlerContext ctx, final Request request) {

        // There is nothing to do if the connection is already destroyed.
        if (!ctx.channel().isActive()) {
            request.release();
            logger.warn("drop request when channel is inActive");
            return;
        }

        if (request.getArgScheme() == null) {
            sendError(ErrorType.BadRequest, "Expected incoming call to have \"as\" header set", request, ctx);
            return;
        }

        final String service = request.getService();
        if (service == null || service.isEmpty()) {
            sendError(ErrorType.BadRequest, "Expected incoming call to have serviceName", request, ctx);
            return;
        }

        // Get the endpoint. The assumption over here is that endpoints are
        // always going to to utf-8 encoded.
        String endpoint = request.getEndpoint();
        if (endpoint == null || endpoint.isEmpty()) {
            sendError(ErrorType.BadRequest, "Expected incoming call to have endpoint", request, ctx);
            return;
        }

        // Get handler for this method
        RequestHandler handler = this.getRequestHandler(service, endpoint);
        if (handler == null) {
            handler = topChannel.getDefaultUserHandler();
        }
        if (handler == null) {
            sendError(
                ErrorType.BadRequest,
                "No handler function for service:endpoint=" + service + ':' + endpoint,
                request,
                ctx
            );
            return;
        }

        ListenableFuture<? extends Response> responseFuture;
        try {
            // In case of an AsyncRequestHandler there's no need to submit a task on the executor.
            // It does require a down-cast to AsyncRequestHandler.
            responseFuture = handler instanceof AsyncRequestHandler
                ? sendRequestToAsyncHandler((AsyncRequestHandler) handler, request)
                : listeningExecutorService.submit(new CallableHandler(handler, topChannel, request));
        } catch (Throwable re) {
            request.releaseQuietly();
            responseFuture = Futures.immediateFailedFuture(re);
        }

        Futures.addCallback(responseFuture, new FutureCallback<Response>() {

            @Override
            public void onSuccess(Response response) {
                if (ctx.channel().isActive()) {
                    responseQueue.offer(response);
                    ctx.channel().eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            sendResponse(ctx);
                        }
                    });
                } else {
                    response.release();
                }
            }

            @Override
            public void onFailure(@NotNull Throwable throwable) {
                logger.error("Failed to handle the request due to exception.", throwable);

                ErrorType errorType = null;
                String message = null;

                if (throwable instanceof ProtocolError) {
                    ProtocolError protocolError = (ProtocolError) throwable;
                    errorType = protocolError.getErrorType();
                    message = protocolError.getMessage();
                } else {
                    Throwable cause = throwable.getCause();
                    while (cause != null) {
                        if (cause instanceof ProtocolError) {
                            ProtocolError protocolError = (ProtocolError) cause;
                            errorType = protocolError.getErrorType();
                            message = protocolError.getMessage();
                            break;
                        }
                        cause = cause.getCause();
                    }
                }

                if (errorType == null) {
                    errorType = ErrorType.UnexpectedError;
                }
                if (message == null) {
                    message = "Failed to handle the request: " + throwable.getMessage();
                }
                sendError(errorType, message, request, ctx);
            }

        }, listeningExecutorService);
    }