in servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java [148:390]
public void onSuccess(@Nullable final StreamingHttpResponse response) {
if (response == null) {
sendNullResponse();
} else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) {
subscriber.onSuccess(response.transformMessageBody(payload ->
payload.liftSync(subscriber ->
new Subscriber<Object>() {
@Nullable
private Subscription subscription;
@Override
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscriber.onSubscribe(new Subscription() {
@Override
public void request(final long n) {
subscription.request(n);
}
@Override
public void cancel() {
if (!discardEventsAfterCancel) {
try {
if (stateUpdater.compareAndSet(
ResponseCompletionSubscriber.this,
PROCESSING_PAYLOAD, TERMINATED)) {
beforeFinally.cancel();
}
} finally {
subscription.cancel();
}
return;
}
for (;;) {
final int state = ResponseCompletionSubscriber.this.state;
assert state != IDLE;
if (state == PROCESSING_PAYLOAD) {
if (stateUpdater.compareAndSet(
ResponseCompletionSubscriber.this,
PROCESSING_PAYLOAD, TERMINATED)) {
try {
beforeFinally.cancel();
} finally {
subscription.cancel();
}
break;
}
} else if (state == DELIVERING_PAYLOAD) {
if (stateUpdater.compareAndSet(
ResponseCompletionSubscriber.this,
DELIVERING_PAYLOAD, AWAITING_CANCEL)) {
break;
}
} else if (state == TERMINATED) {
// still propagate cancel to the original subscription:
subscription.cancel();
break;
} else {
// cancel can be invoked multiple times
assert state == AWAITING_CANCEL;
break;
}
}
}
});
}
@Override
public void onNext(@Nullable final Object o) {
if (!discardEventsAfterCancel) {
subscriber.onNext(o);
return;
}
boolean reentry = false;
for (;;) {
final int state = ResponseCompletionSubscriber.this.state;
assert state != IDLE;
if (state == TERMINATED) {
// We already cancelled and have to discard further events
return;
}
if (state == DELIVERING_PAYLOAD || state == AWAITING_CANCEL) {
reentry = true;
break;
}
if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
PROCESSING_PAYLOAD, DELIVERING_PAYLOAD)) {
break;
}
}
try {
subscriber.onNext(o);
} finally {
// Re-entry -> don't unlock
if (!reentry) {
for (;;) {
final int state = ResponseCompletionSubscriber.this.state;
assert state != IDLE;
assert state != PROCESSING_PAYLOAD;
if (state == TERMINATED) {
break;
}
if (state == DELIVERING_PAYLOAD) {
if (stateUpdater.compareAndSet(
ResponseCompletionSubscriber.this,
DELIVERING_PAYLOAD, PROCESSING_PAYLOAD)) {
break;
}
} else if (stateUpdater.compareAndSet(
ResponseCompletionSubscriber.this,
AWAITING_CANCEL, TERMINATED)) {
try {
beforeFinally.cancel();
} finally {
assert subscription != null;
subscription.cancel();
}
break;
}
}
}
}
}
@Override
public void onError(final Throwable t) {
if (!discardEventsAfterCancel) {
try {
if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
PROCESSING_PAYLOAD, TERMINATED)) {
beforeFinally.onError(t);
}
} catch (Throwable cause) {
t.addSuppressed(cause);
}
subscriber.onError(t);
return;
}
final int prevState = setTerminalState();
if (prevState == TERMINATED) {
// We already cancelled and have to discard further events
return;
}
// Propagate original cancel to let Subscription observe it
final boolean propagateCancel = prevState == AWAITING_CANCEL;
try {
beforeFinally.onError(t);
} catch (Throwable cause) {
t.addSuppressed(cause);
}
try {
subscriber.onError(t);
} finally {
cancel0(propagateCancel);
}
}
@Override
public void onComplete() {
if (!discardEventsAfterCancel) {
try {
if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
PROCESSING_PAYLOAD, TERMINATED)) {
beforeFinally.onComplete();
}
} catch (Throwable cause) {
subscriber.onError(cause);
return;
}
subscriber.onComplete();
return;
}
final int prevState = setTerminalState();
if (prevState == TERMINATED) {
// We already cancelled and have to discard further events
return;
}
// Propagate original cancel to let Subscription observe it
final boolean propagateCancel = prevState == AWAITING_CANCEL;
try {
try {
beforeFinally.onComplete();
} catch (Throwable cause) {
subscriber.onError(cause);
return;
}
subscriber.onComplete();
} finally {
cancel0(propagateCancel);
}
}
private int setTerminalState() {
for (;;) {
final int state = ResponseCompletionSubscriber.this.state;
assert state != IDLE;
if (state == TERMINATED) {
// We already cancelled and have to discard further events
return state;
}
if (state == PROCESSING_PAYLOAD) {
if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
PROCESSING_PAYLOAD, TERMINATED)) {
return state;
}
} else if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
state, TERMINATED)) {
// re-entry, but we can terminate because this is a final event:
return state;
}
}
}
private void cancel0(final boolean propagateCancel) {
if (propagateCancel) {
assert subscription != null;
subscription.cancel();
}
}
})
));
} else {
// Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been
// cancelled.
assert state == TERMINATED;
if (discardEventsAfterCancel) {
return;
}
subscriber.onSuccess(response.transformMessageBody(payload -> {
// We have been cancelled. Subscribe and cancel the content so that we do not hold up the
// connection and indicate that there is no one else that will subscribe.
toSource(payload).subscribe(CancelImmediatelySubscriber.INSTANCE);
return Publisher.failed(new CancellationException("Received response post cancel."));
}));
}
}