public void subscribe()

in http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java [285:389]


        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            response.subscribe(new Subscriber<HttpContent>() {
                @Override
                public void onSubscribe(Subscription subscription) {
                    subscriber.onSubscribe(new OnCancelSubscription(resolveSubscription(subscription),
                                                                    this::onCancel));
                }

                private Subscription resolveSubscription(Subscription subscription) {
                    // For HTTP2 we send a RST_STREAM frame on cancel to stop the service from sending more data
                    if (ChannelAttributeKey.getProtocolNow(channelContext.channel()) == Protocol.HTTP2) {
                        return new Http2ResetSendingSubscription(channelContext, subscription);
                    } else {
                        return subscription;
                    }
                }

                private void onCancel() {
                    if (!isDone.compareAndSet(false, true)) {
                        return;
                    }
                    try {
                        SdkCancellationException e = new SdkCancellationException(
                                "Subscriber cancelled before all events were published");
                        log.debug(channelContext.channel(), () -> "Subscriber cancelled before all events were published");
                        executeFuture.completeExceptionally(e);
                    } finally {
                        runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",
                            () -> closeAndRelease(channelContext));
                    }
                }

                @Override
                public void onNext(HttpContent httpContent) {
                    // isDone may be true if the subscriber cancelled
                    if (isDone.get()) {
                        ReferenceCountUtil.release(httpContent);
                        return;
                    }

                    // Needed to prevent use-after-free bug if the subscriber's onNext is asynchronous
                    ByteBuffer byteBuffer =
                        tryCatchFinally(() -> copyToByteBuffer(httpContent.content()),
                                        this::onError,
                                        httpContent::release);


                    //As per reactive-streams rule 2.13, we should not call subscriber#onError when
                    //exception is thrown from subscriber#onNext
                    if (byteBuffer != null) {
                        tryCatch(() -> subscriber.onNext(byteBuffer),
                                 this::notifyError);
                    }
                }

                @Override
                public void onError(Throwable t) {
                    if (!isDone.compareAndSet(false, true)) {
                        return;
                    }
                    try {
                        runAndLogError(channelContext.channel(),
                                       () -> String.format("Subscriber %s threw an exception in onError.", subscriber),
                                       () -> subscriber.onError(t));
                        notifyError(t);
                    } finally {
                        runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",
                            () -> closeAndRelease(channelContext));
                    }
                }

                @Override
                public void onComplete() {
                    // For HTTP/2 it's possible to get an onComplete after we cancel due to the channel becoming
                    // inactive. We guard against that here and just ignore the signal (see HandlerPublisher)
                    if (!isDone.compareAndSet(false, true)) {
                        return;
                    }

                    try {
                        validateResponseContentLength(channelContext);
                        try {
                            runAndLogError(channelContext.channel(),
                                           () -> String.format("Subscriber %s threw an exception in onComplete.", subscriber),
                                           subscriber::onComplete);
                        } finally {
                            finalizeResponse(requestContext, channelContext);
                        }
                    } catch (IOException e) {
                        notifyError(e);
                        runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",
                                       () -> closeAndRelease(channelContext));
                    }
                }

                private void notifyError(Throwable throwable) {
                    SdkAsyncHttpResponseHandler handler = requestContext.handler();
                    runAndLogError(channelContext.channel(),
                                   () -> String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError.", handler),
                                   () -> handler.onError(throwable));
                    executeFuture.completeExceptionally(throwable);
                }

            });
        }