public void subscribe()

in core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/InMemoryPublisher.java [43:124]


    public void subscribe(Subscriber<? super ByteBuffer> s) {
        if (!subscribed.compareAndSet(false, true)) {
            s.onSubscribe(new NoOpSubscription());
            s.onError(new IllegalStateException("InMemoryPublisher cannot be subscribed to twice."));
            return;
        }

        s.onSubscribe(new Subscription() {
            private final AtomicBoolean sending = new AtomicBoolean(false);

            private final Object doneLock = new Object();
            private final AtomicBoolean done = new AtomicBoolean(false);
            private final AtomicLong demand = new AtomicLong(0);
            private int position = 0;

            @Override
            public void request(long n) {
                if (done.get()) {
                    return;
                }

                try {
                    demand.addAndGet(n);
                    fulfillDemand();
                } catch (Throwable t) {
                    finish(() -> s.onError(t));
                }
            }

            private void fulfillDemand() {
                do {
                    if (sending.compareAndSet(false, true)) {
                        try {
                            send();
                        } finally {
                            sending.set(false);
                        }
                    }
                } while (!done.get() && demand.get() > 0);
            }

            private void send() {
                while (true) {
                    assert position >= 0;
                    assert position <= data.size();

                    if (done.get()) {
                        break;
                    }

                    if (position == data.size()) {
                        finish(s::onComplete);
                        break;
                    }

                    if (demand.get() == 0) {
                        break;
                    }

                    demand.decrementAndGet();
                    int dataIndex = position;
                    s.onNext(data.get(dataIndex));
                    data.set(dataIndex, null); // We're done with this data here, so allow it to be garbage collected
                    position++;
                }
            }

            @Override
            public void cancel() {
                finish(() -> {
                });
            }

            private void finish(Runnable thingToDo) {
                synchronized (doneLock) {
                    if (done.compareAndSet(false, true)) {
                        thingToDo.run();
                    }
                }
            }
        });
    }