in core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/InMemoryPublisher.java [43:124]
public void subscribe(Subscriber<? super ByteBuffer> s) {
if (!subscribed.compareAndSet(false, true)) {
s.onSubscribe(new NoOpSubscription());
s.onError(new IllegalStateException("InMemoryPublisher cannot be subscribed to twice."));
return;
}
s.onSubscribe(new Subscription() {
private final AtomicBoolean sending = new AtomicBoolean(false);
private final Object doneLock = new Object();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicLong demand = new AtomicLong(0);
private int position = 0;
@Override
public void request(long n) {
if (done.get()) {
return;
}
try {
demand.addAndGet(n);
fulfillDemand();
} catch (Throwable t) {
finish(() -> s.onError(t));
}
}
private void fulfillDemand() {
do {
if (sending.compareAndSet(false, true)) {
try {
send();
} finally {
sending.set(false);
}
}
} while (!done.get() && demand.get() > 0);
}
private void send() {
while (true) {
assert position >= 0;
assert position <= data.size();
if (done.get()) {
break;
}
if (position == data.size()) {
finish(s::onComplete);
break;
}
if (demand.get() == 0) {
break;
}
demand.decrementAndGet();
int dataIndex = position;
s.onNext(data.get(dataIndex));
data.set(dataIndex, null); // We're done with this data here, so allow it to be garbage collected
position++;
}
}
@Override
public void cancel() {
finish(() -> {
});
}
private void finish(Runnable thingToDo) {
synchronized (doneLock) {
if (done.compareAndSet(false, true)) {
thingToDo.run();
}
}
}
});
}