in pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java [162:172]
public ReactiveMessagePipeline start() {
if (this.killSwitch.get() != null) {
throw new IllegalStateException("Message handler is already running.");
}
Disposable disposable = this.pipeline.subscribe(null, this::logError, this::logUnexpectedCompletion);
if (!this.killSwitch.compareAndSet(null, disposable)) {
disposable.dispose();
throw new IllegalStateException("Message handler was already running.");
}
return this;
}