autodispose/src/main/java/autodispose2/AutoDisposeEndConsumerHelper.java (44 lines of code) (raw):

/* * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing permissions and * limitations under the License. */ package autodispose2; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.ProtocolViolationException; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.Subscription; /** * Utility class to help report multiple subscriptions with the same * consumer type instead of the internal "Disposable already set!" message * that is practically reserved for internal operators and indicate bugs in them. * * Copied from the RxJava implementation. */ final class AutoDisposeEndConsumerHelper { /** * Utility class. */ private AutoDisposeEndConsumerHelper() { throw new IllegalStateException("No instances!"); } /** * Atomically updates the target upstream AtomicReference from null to the non-null * next Disposable, otherwise disposes next and reports a ProtocolViolationException * if the AtomicReference doesn't contain the shared disposed indicator. * * @param upstream the target AtomicReference to update * @param next the Disposable to set on it atomically * @param observer the class of the consumer to have a personalized * error message if the upstream already contains a non-cancelled Disposable. * @return true if successful, false if the content of the AtomicReference was non null */ public static boolean setOnce(AtomicReference<Disposable> upstream, Disposable next, Class<?> observer) { AutoDisposeUtil.checkNotNull(next, "next is null"); if (!upstream.compareAndSet(null, next)) { next.dispose(); if (upstream.get() != AutoDisposableHelper.DISPOSED) { reportDoubleSubscription(observer); } return false; } return true; } /** * Atomically updates the target upstream AtomicReference from null to the non-null * next Subscription, otherwise cancels next and reports a ProtocolViolationException * if the AtomicReference doesn't contain the shared cancelled indicator. * * @param upstream the target AtomicReference to update * @param next the Subscription to set on it atomically * @param subscriber the class of the consumer to have a personalized * error message if the upstream already contains a non-cancelled Subscription. * @return true if successful, false if the content of the AtomicReference was non null */ public static boolean setOnce(AtomicReference<Subscription> upstream, Subscription next, Class<?> subscriber) { AutoDisposeUtil.checkNotNull(next, "next is null"); if (!upstream.compareAndSet(null, next)) { next.cancel(); if (upstream.get() != AutoSubscriptionHelper.CANCELLED) { reportDoubleSubscription(subscriber); } return false; } return true; } /** * Builds the error message with the consumer class. * * @param consumer the class of the consumer * @return the error message string */ public static String composeMessage(String consumer) { return "It is not allowed to subscribe with a(n) " + consumer + " multiple times. " + "Please create a fresh instance of " + consumer + " and subscribe that to the target source instead."; } /** * Report a ProtocolViolationException with a personalized message referencing * the simple type name of the consumer class and report it via * RxJavaPlugins.onError. * * @param consumer the class of the consumer */ public static void reportDoubleSubscription(Class<?> consumer) { RxJavaPlugins.onError(new ProtocolViolationException(composeMessage(consumer.getName()))); } }