in http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java [431:487]
public void subscribe(Subscriber<? super HttpContent> subscriber) {
publisher.subscribe(new Subscriber<ByteBuffer>() {
@Override
public void onSubscribe(Subscription subscription) {
StreamedRequest.this.subscription = subscription;
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(ByteBuffer contentBytes) {
if (done) {
return;
}
try {
int newLimit = clampedBufferLimit(contentBytes.remaining());
contentBytes.limit(newLimit);
ByteBuf contentByteBuf = Unpooled.wrappedBuffer(contentBytes);
HttpContent content = new DefaultHttpContent(contentByteBuf);
subscriber.onNext(content);
written += newLimit;
if (!shouldContinuePublishing()) {
done = true;
subscription.cancel();
subscriber.onComplete();
}
} catch (Throwable t) {
onError(t);
}
}
@Override
public void onError(Throwable t) {
if (!done) {
done = true;
subscription.cancel();
subscriber.onError(t);
}
}
@Override
public void onComplete() {
if (!done) {
Long expectedContentLength = requestContentLength.orElse(null);
if (expectedContentLength != null && written < expectedContentLength) {
onError(new IllegalStateException("Request content was only " + written + " bytes, but the specified "
+ "content-length was " + expectedContentLength + " bytes."));
} else {
done = true;
subscriber.onComplete();
}
}
}
});
}