public Single handle()

in servicetalk-http-api/src/main/java/io/servicetalk/http/api/BlockingStreamingToStreamingService.java [61:153]


    public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
                                                final StreamingHttpRequest request,
                                                final StreamingHttpResponseFactory responseFactory) {
        return new Single<StreamingHttpResponse>() {
            @Override
            protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> subscriber) {
                final ThreadInterruptingCancellable tiCancellable = new ThreadInterruptingCancellable(currentThread());
                try {
                    subscriber.onSubscribe(tiCancellable);
                } catch (Throwable cause) {
                    handleExceptionFromOnSubscribe(subscriber, cause);
                    return;
                }

                // This exists to help users with error propagation. If the user closes the payloadWriter and they throw
                // (e.g. try-with-resources) this processor is merged with the payloadWriter Publisher so the error will
                // still be propagated.
                final Processor exceptionProcessor = newCompletableProcessor();
                final BufferHttpPayloadWriter payloadWriter = new BufferHttpPayloadWriter(
                        ctx.headersFactory().newTrailers());
                DefaultBlockingStreamingHttpServerResponse response = null;
                try {
                    final Consumer<DefaultHttpResponseMetaData> sendMeta = (metaData) -> {
                        final DefaultStreamingHttpResponse result;
                        try {
                            // transfer-encoding takes precedence over content-length.
                            // > When a message does not have a Transfer-Encoding header field, a
                            // Content-Length header field can provide the anticipated size.
                            // https://tools.ietf.org/html/rfc7230#section-3.3.2
                            final HttpHeaders headers = metaData.headers();
                            final HttpProtocolVersion version = metaData.version();
                            boolean addTrailers = version.major() > 1 || isTransferEncodingChunked(headers);
                            if (!addTrailers && h1TrailersSupported(version) && !hasContentLength(headers) &&
                                    // HEAD responses MUST never carry a payload, adding chunked makes no sense and
                                    // breaks our HttpResponseDecoder
                                    !HEAD.equals(request.method())) {
                                // this is likely not supported in http/1.0 and it is possible that a response has
                                // neither header and the connection close indicates the end of the response.
                                // https://tools.ietf.org/html/rfc7230#section-3.3.3
                                headers.add(TRANSFER_ENCODING, CHUNKED);
                                addTrailers = true;
                            }
                            Publisher<Object> messageBody = fromSource(exceptionProcessor)
                                    .merge(payloadWriter.connect());
                            if (addTrailers) {
                                messageBody = messageBody.concat(succeeded(payloadWriter.trailers()));
                            }
                            messageBody = messageBody.beforeSubscription(() -> new Subscription() {
                                @Override
                                public void request(final long n) {
                                }

                                @Override
                                public void cancel() {
                                    tiCancellable.cancel();
                                }
                            });
                            result = new DefaultStreamingHttpResponse(metaData.status(), version, headers,
                                    metaData.context0(), ctx.executionContext().bufferAllocator(), messageBody,
                                    forTransportReceive(false, version, headers), ctx.headersFactory());
                        } catch (Throwable t) {
                            subscriber.onError(t);
                            throw t;
                        }
                        subscriber.onSuccess(result);
                    };

                    response = new DefaultBlockingStreamingHttpServerResponse(OK, request.version(),
                            ctx.headersFactory().newHeaders(), payloadWriter,
                            ctx.executionContext().bufferAllocator(), sendMeta);
                    original.handle(ctx, request.toBlockingStreamingRequest(), response);

                    // The user code has returned successfully, complete the processor so the response stream can
                    // complete. If the user handles the request asynchronously (e.g. on another thread) they are
                    // responsible for closing the payloadWriter.
                    exceptionProcessor.onComplete();
                } catch (Throwable cause) {
                    tiCancellable.setDone(cause);
                    if (response == null || response.markMetaSent()) {
                        safeOnError(subscriber, cause);
                    } else {
                        try {
                            exceptionProcessor.onError(cause);
                        } finally {
                            safeClose(payloadWriter, cause);
                        }
                    }
                    return;
                }
                tiCancellable.setDone();
            }
        };
    }