public void onSuccess()

in servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java [148:390]


        public void onSuccess(@Nullable final StreamingHttpResponse response) {
            if (response == null) {
                sendNullResponse();
            } else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) {
                subscriber.onSuccess(response.transformMessageBody(payload ->
                        payload.liftSync(subscriber ->
                                new Subscriber<Object>() {
                                    @Nullable
                                    private Subscription subscription;

                                    @Override
                                    public void onSubscribe(final Subscription subscription) {
                                        this.subscription = subscription;
                                        subscriber.onSubscribe(new Subscription() {
                                            @Override
                                            public void request(final long n) {
                                                subscription.request(n);
                                            }

                                            @Override
                                            public void cancel() {
                                                if (!discardEventsAfterCancel) {
                                                    try {
                                                        if (stateUpdater.compareAndSet(
                                                                ResponseCompletionSubscriber.this,
                                                                PROCESSING_PAYLOAD, TERMINATED)) {
                                                            beforeFinally.cancel();
                                                        }
                                                    } finally {
                                                        subscription.cancel();
                                                    }
                                                    return;
                                                }

                                                for (;;) {
                                                    final int state = ResponseCompletionSubscriber.this.state;
                                                    assert state != IDLE;
                                                    if (state == PROCESSING_PAYLOAD) {
                                                        if (stateUpdater.compareAndSet(
                                                                ResponseCompletionSubscriber.this,
                                                                PROCESSING_PAYLOAD, TERMINATED)) {
                                                            try {
                                                                beforeFinally.cancel();
                                                            } finally {
                                                                subscription.cancel();
                                                            }
                                                            break;
                                                        }
                                                    } else if (state == DELIVERING_PAYLOAD) {
                                                        if (stateUpdater.compareAndSet(
                                                                ResponseCompletionSubscriber.this,
                                                                DELIVERING_PAYLOAD, AWAITING_CANCEL)) {
                                                            break;
                                                        }
                                                    } else if (state == TERMINATED) {
                                                        // still propagate cancel to the original subscription:
                                                        subscription.cancel();
                                                        break;
                                                    } else {
                                                        // cancel can be invoked multiple times
                                                        assert state == AWAITING_CANCEL;
                                                        break;
                                                    }
                                                }
                                            }
                                        });
                                    }

                                    @Override
                                    public void onNext(@Nullable final Object o) {
                                        if (!discardEventsAfterCancel) {
                                            subscriber.onNext(o);
                                            return;
                                        }

                                        boolean reentry = false;
                                        for (;;) {
                                            final int state = ResponseCompletionSubscriber.this.state;
                                            assert state != IDLE;
                                            if (state == TERMINATED) {
                                                // We already cancelled and have to discard further events
                                                return;
                                            }
                                            if (state == DELIVERING_PAYLOAD || state == AWAITING_CANCEL) {
                                                reentry = true;
                                                break;
                                            }
                                            if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
                                                    PROCESSING_PAYLOAD, DELIVERING_PAYLOAD)) {
                                                break;
                                            }
                                        }

                                        try {
                                            subscriber.onNext(o);
                                        } finally {
                                            // Re-entry -> don't unlock
                                            if (!reentry) {
                                                for (;;) {
                                                    final int state = ResponseCompletionSubscriber.this.state;
                                                    assert state != IDLE;
                                                    assert state != PROCESSING_PAYLOAD;
                                                    if (state == TERMINATED) {
                                                        break;
                                                    }
                                                    if (state == DELIVERING_PAYLOAD) {
                                                        if (stateUpdater.compareAndSet(
                                                                ResponseCompletionSubscriber.this,
                                                                DELIVERING_PAYLOAD, PROCESSING_PAYLOAD)) {
                                                            break;
                                                        }
                                                    } else if (stateUpdater.compareAndSet(
                                                            ResponseCompletionSubscriber.this,
                                                            AWAITING_CANCEL, TERMINATED)) {
                                                        try {
                                                            beforeFinally.cancel();
                                                        } finally {
                                                            assert subscription != null;
                                                            subscription.cancel();
                                                        }
                                                        break;
                                                    }
                                                }
                                            }
                                        }
                                    }

                                    @Override
                                    public void onError(final Throwable t) {
                                        if (!discardEventsAfterCancel) {
                                            try {
                                                if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
                                                        PROCESSING_PAYLOAD, TERMINATED)) {
                                                    beforeFinally.onError(t);
                                                }
                                            } catch (Throwable cause) {
                                                t.addSuppressed(cause);
                                            }
                                            subscriber.onError(t);
                                            return;
                                        }

                                        final int prevState = setTerminalState();
                                        if (prevState == TERMINATED) {
                                            // We already cancelled and have to discard further events
                                            return;
                                        }
                                        // Propagate original cancel to let Subscription observe it
                                        final boolean propagateCancel = prevState == AWAITING_CANCEL;

                                        try {
                                            beforeFinally.onError(t);
                                        } catch (Throwable cause) {
                                            t.addSuppressed(cause);
                                        }
                                        try {
                                            subscriber.onError(t);
                                        } finally {
                                            cancel0(propagateCancel);
                                        }
                                    }

                                    @Override
                                    public void onComplete() {
                                        if (!discardEventsAfterCancel) {
                                            try {
                                                if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
                                                        PROCESSING_PAYLOAD, TERMINATED)) {
                                                    beforeFinally.onComplete();
                                                }
                                            } catch (Throwable cause) {
                                                subscriber.onError(cause);
                                                return;
                                            }
                                            subscriber.onComplete();
                                            return;
                                        }

                                        final int prevState = setTerminalState();
                                        if (prevState == TERMINATED) {
                                            // We already cancelled and have to discard further events
                                            return;
                                        }
                                        // Propagate original cancel to let Subscription observe it
                                        final boolean propagateCancel = prevState == AWAITING_CANCEL;

                                        try {
                                            try {
                                                beforeFinally.onComplete();
                                            } catch (Throwable cause) {
                                                subscriber.onError(cause);
                                                return;
                                            }
                                            subscriber.onComplete();
                                        } finally {
                                            cancel0(propagateCancel);
                                        }
                                    }

                                    private int setTerminalState() {
                                        for (;;) {
                                            final int state = ResponseCompletionSubscriber.this.state;
                                            assert state != IDLE;
                                            if (state == TERMINATED) {
                                                // We already cancelled and have to discard further events
                                                return state;
                                            }
                                            if (state == PROCESSING_PAYLOAD) {
                                                if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
                                                        PROCESSING_PAYLOAD, TERMINATED)) {
                                                    return state;
                                                }
                                            } else if (stateUpdater.compareAndSet(ResponseCompletionSubscriber.this,
                                                    state, TERMINATED)) {
                                                // re-entry, but we can terminate because this is a final event:
                                                return state;
                                            }
                                        }
                                    }

                                    private void cancel0(final boolean propagateCancel) {
                                        if (propagateCancel) {
                                            assert subscription != null;
                                            subscription.cancel();
                                        }
                                    }
                                })
                ));
            } else {
                // Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been
                // cancelled.
                assert state == TERMINATED;
                if (discardEventsAfterCancel) {
                    return;
                }
                subscriber.onSuccess(response.transformMessageBody(payload -> {
                    // We have been cancelled. Subscribe and cancel the content so that we do not hold up the
                    // connection and indicate that there is no one else that will subscribe.
                    toSource(payload).subscribe(CancelImmediatelySubscriber.INSTANCE);
                    return Publisher.failed(new CancellationException("Received response post cancel."));
                }));
            }
        }