protected abstract AsyncRequestConsumer supplyConsumer()

in httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractServerExchangeHandler.java [76:172]


    protected abstract AsyncRequestConsumer<T> supplyConsumer(
            HttpRequest request,
            EntityDetails entityDetails,
            HttpContext context) throws HttpException;

    /**
     * Triggered to handles the request object produced by the {@link AsyncRequestConsumer} returned
     * from the {@link #supplyConsumer(HttpRequest, EntityDetails, HttpContext)} method. The handler
     * can choose to send response messages immediately inside the call or asynchronously
     * at some later point.
     *
     * @param requestMessage the request message.
     * @param responseTrigger the response trigger.
     * @param context the actual execution context.
     * @throws HttpException in case of an HTTP protocol violation.
     * @throws IOException in case of an I/O error.
     */
    protected abstract void handle(
            T requestMessage,
            AsyncServerRequestHandler.ResponseTrigger responseTrigger,
            HttpContext context) throws HttpException, IOException;

    @Override
    public final void handleRequest(
            final HttpRequest request,
            final EntityDetails entityDetails,
            final ResponseChannel responseChannel,
            final HttpContext context) throws HttpException, IOException {

        final AsyncRequestConsumer<T> requestConsumer = supplyConsumer(request, entityDetails, context);
        if (requestConsumer == null) {
            throw new HttpException("Unable to handle request");
        }
        requestConsumerRef.set(requestConsumer);
        final AsyncServerRequestHandler.ResponseTrigger responseTrigger = new AsyncServerRequestHandler.ResponseTrigger() {

            @Override
            public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
                responseChannel.sendInformation(response, httpContext);
            }

            @Override
            public void submitResponse(
                    final AsyncResponseProducer producer, final HttpContext httpContext) throws HttpException, IOException {
                if (responseProducerRef.compareAndSet(null, producer)) {
                    producer.sendResponse(responseChannel, httpContext);
                }
            }

            @Override
            public void pushPromise(
                    final HttpRequest promise, final HttpContext httpContext, final AsyncPushProducer pushProducer) throws HttpException, IOException {
                responseChannel.pushPromise(promise, pushProducer, httpContext);
            }

            @Override
            public String toString() {
                return "Response trigger: " + responseChannel;
            }

        };
        requestConsumer.consumeRequest(request, entityDetails, context, new FutureCallback<T>() {

            @Override
            public void completed(final T result) {
                try {
                    handle(result, responseTrigger, context);
                } catch (final HttpException ex) {
                    try {
                        responseTrigger.submitResponse(
                                AsyncResponseBuilder.create(HttpStatus.SC_INTERNAL_SERVER_ERROR)
                                        .setEntity(ex.getMessage())
                                        .build(),
                                context);
                    } catch (final HttpException | IOException ex2) {
                        failedInternal(ex2);
                    }
                } catch (final IOException ex) {
                    failedInternal(ex);
                } finally {
                    releaseRequestConsumer();
                }
            }

            @Override
            public void failed(final Exception ex) {
                failedInternal(ex);
            }

            @Override
            public void cancelled() {
                releaseResourcesInternal();
            }

        });

    }