public boolean sendRequestBody()

in services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter.java [62:143]


    public boolean sendRequestBody(ByteBuffer outBuffer) {
        LOG.trace(() -> "Getting data to fill buffer of size " + outBuffer.remaining());

        // Per the spec, onSubscribe is always called before any other
        // signal, so we expect a subscription to always be provided; we just
        // wait for that to happen
        waitForSubscription();

        // The "event loop". Per the spec, the sequence of events is "onSubscribe onNext* (onError | onComplete)?".
        // We don't handle onSubscribe as a discrete event; instead we only enter this loop once we have a
        // subscription.
        //
        // This works by requesting and consuming DATA events until we fill the buffer. We return from the method if
        // we encounter either of the terminal events, COMPLETE or ERROR.
        while (true) {
            // The supplier API requires that we fill the buffer entirely.
            if (!outBuffer.hasRemaining()) {
                break;
            }

            if (eventBuffer.isEmpty() && pending == 0) {
                pending = DEFAULT_REQUEST_SIZE;
                subscription.request(pending);
            }

            Event ev = takeFirstEvent();

            // Discard the event if it's not for the current subscriber
            if (!ev.subscriber().equals(subscriber)) {
                LOG.debug(() -> "Received an event for a previous publisher. Discarding. Event was: " + ev);
                continue;
            }

            switch (ev.type()) {
                case DATA:
                    ByteBuffer srcBuffer = ((DataEvent) ev).data();

                    ByteBuffer bufferToWrite = srcBuffer.duplicate();
                    int nBytesToWrite = Math.min(outBuffer.remaining(), srcBuffer.remaining());

                    // src is larger, create a resized view to prevent
                    // buffer overflow in the subsequent put() call
                    if (bufferToWrite.remaining() > nBytesToWrite) {
                        bufferToWrite.limit(bufferToWrite.position() + nBytesToWrite);
                    }

                    outBuffer.put(bufferToWrite);
                    srcBuffer.position(bufferToWrite.limit());

                    if (!srcBuffer.hasRemaining()) {
                        --pending;
                    } else {
                        eventBuffer.push(ev);
                    }

                    break;

                case COMPLETE:
                    // Leave this event in the queue so that if getRequestData
                    // gets call after the stream is already done, we pop it off again.
                    eventBuffer.push(ev);
                    pending = 0;
                    return true;

                case ERROR:
                    // Leave this event in the queue so that if getRequestData
                    // gets call after the stream is already done, we pop it off again.
                    eventBuffer.push(ev);
                    Throwable t = ((ErrorEvent) ev).error();
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException) t;
                    }
                    throw new RuntimeException(t);

                default:
                    // In case new event types are introduced that this loop doesn't account for
                    throw new IllegalStateException("Unknown event type: " + ev.type());
            }
        }

        return false;
    }