in http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java [285:389]
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
response.subscribe(new Subscriber<HttpContent>() {
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(new OnCancelSubscription(resolveSubscription(subscription),
this::onCancel));
}
private Subscription resolveSubscription(Subscription subscription) {
// For HTTP2 we send a RST_STREAM frame on cancel to stop the service from sending more data
if (ChannelAttributeKey.getProtocolNow(channelContext.channel()) == Protocol.HTTP2) {
return new Http2ResetSendingSubscription(channelContext, subscription);
} else {
return subscription;
}
}
private void onCancel() {
if (!isDone.compareAndSet(false, true)) {
return;
}
try {
SdkCancellationException e = new SdkCancellationException(
"Subscriber cancelled before all events were published");
log.debug(channelContext.channel(), () -> "Subscriber cancelled before all events were published");
executeFuture.completeExceptionally(e);
} finally {
runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",
() -> closeAndRelease(channelContext));
}
}
@Override
public void onNext(HttpContent httpContent) {
// isDone may be true if the subscriber cancelled
if (isDone.get()) {
ReferenceCountUtil.release(httpContent);
return;
}
// Needed to prevent use-after-free bug if the subscriber's onNext is asynchronous
ByteBuffer byteBuffer =
tryCatchFinally(() -> copyToByteBuffer(httpContent.content()),
this::onError,
httpContent::release);
//As per reactive-streams rule 2.13, we should not call subscriber#onError when
//exception is thrown from subscriber#onNext
if (byteBuffer != null) {
tryCatch(() -> subscriber.onNext(byteBuffer),
this::notifyError);
}
}
@Override
public void onError(Throwable t) {
if (!isDone.compareAndSet(false, true)) {
return;
}
try {
runAndLogError(channelContext.channel(),
() -> String.format("Subscriber %s threw an exception in onError.", subscriber),
() -> subscriber.onError(t));
notifyError(t);
} finally {
runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",
() -> closeAndRelease(channelContext));
}
}
@Override
public void onComplete() {
// For HTTP/2 it's possible to get an onComplete after we cancel due to the channel becoming
// inactive. We guard against that here and just ignore the signal (see HandlerPublisher)
if (!isDone.compareAndSet(false, true)) {
return;
}
try {
validateResponseContentLength(channelContext);
try {
runAndLogError(channelContext.channel(),
() -> String.format("Subscriber %s threw an exception in onComplete.", subscriber),
subscriber::onComplete);
} finally {
finalizeResponse(requestContext, channelContext);
}
} catch (IOException e) {
notifyError(e);
runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",
() -> closeAndRelease(channelContext));
}
}
private void notifyError(Throwable throwable) {
SdkAsyncHttpResponseHandler handler = requestContext.handler();
runAndLogError(channelContext.channel(),
() -> String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError.", handler),
() -> handler.onError(throwable));
executeFuture.completeExceptionally(throwable);
}
});
}