in services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter.java [62:143]
public boolean sendRequestBody(ByteBuffer outBuffer) {
LOG.trace(() -> "Getting data to fill buffer of size " + outBuffer.remaining());
// Per the spec, onSubscribe is always called before any other
// signal, so we expect a subscription to always be provided; we just
// wait for that to happen
waitForSubscription();
// The "event loop". Per the spec, the sequence of events is "onSubscribe onNext* (onError | onComplete)?".
// We don't handle onSubscribe as a discrete event; instead we only enter this loop once we have a
// subscription.
//
// This works by requesting and consuming DATA events until we fill the buffer. We return from the method if
// we encounter either of the terminal events, COMPLETE or ERROR.
while (true) {
// The supplier API requires that we fill the buffer entirely.
if (!outBuffer.hasRemaining()) {
break;
}
if (eventBuffer.isEmpty() && pending == 0) {
pending = DEFAULT_REQUEST_SIZE;
subscription.request(pending);
}
Event ev = takeFirstEvent();
// Discard the event if it's not for the current subscriber
if (!ev.subscriber().equals(subscriber)) {
LOG.debug(() -> "Received an event for a previous publisher. Discarding. Event was: " + ev);
continue;
}
switch (ev.type()) {
case DATA:
ByteBuffer srcBuffer = ((DataEvent) ev).data();
ByteBuffer bufferToWrite = srcBuffer.duplicate();
int nBytesToWrite = Math.min(outBuffer.remaining(), srcBuffer.remaining());
// src is larger, create a resized view to prevent
// buffer overflow in the subsequent put() call
if (bufferToWrite.remaining() > nBytesToWrite) {
bufferToWrite.limit(bufferToWrite.position() + nBytesToWrite);
}
outBuffer.put(bufferToWrite);
srcBuffer.position(bufferToWrite.limit());
if (!srcBuffer.hasRemaining()) {
--pending;
} else {
eventBuffer.push(ev);
}
break;
case COMPLETE:
// Leave this event in the queue so that if getRequestData
// gets call after the stream is already done, we pop it off again.
eventBuffer.push(ev);
pending = 0;
return true;
case ERROR:
// Leave this event in the queue so that if getRequestData
// gets call after the stream is already done, we pop it off again.
eventBuffer.push(ev);
Throwable t = ((ErrorEvent) ev).error();
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
throw new RuntimeException(t);
default:
// In case new event types are introduced that this loop doesn't account for
throw new IllegalStateException("Unknown event type: " + ev.type());
}
}
return false;
}