in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservableConnectionHandler.java [148:211]
private <T> Subscription serveNestedObservable(
final Observable<T> observable,
final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection,
final RemoteRxEvent event,
final Func1<Map<String, String>, Func1<T, Boolean>> filterFunction,
final Encoder<T> encoder,
final ServeNestedObservable<Observable<T>> serveConfig,
final WritableEndpoint<Observable<T>> endpoint) {
final MutableReference<Subscription> subReference = new MutableReference<>();
subReference.setValue(
observable
.filter(filterFunction.call(event.getSubscribeParameters()))
.doOnCompleted(new Action0() {
@Override
public void call() {
logger.info("OnCompleted recieved in serveNestedObservable, sending to client.");
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable t1) {
logger.info("OnError received in serveNestedObservable, sending to client: ", t1);
}
})
.map(new Func1<T, byte[]>() {
@Override
public byte[] call(T t1) {
return encoder.encode(t1);
}
})
.materialize()
.map(new Func1<Notification<byte[]>, RemoteRxEvent>() {
@Override
public RemoteRxEvent call(Notification<byte[]> notification) {
if (notification.getKind() == Notification.Kind.OnNext) {
return RemoteRxEvent.next(event.getName(), notification.getValue());
} else if (notification.getKind() == Notification.Kind.OnError) {
return RemoteRxEvent.error(event.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable()));
} else if (notification.getKind() == Notification.Kind.OnCompleted) {
return RemoteRxEvent.completed(event.getName());
} else {
throw new RuntimeException("Unsupported notification kind: " + notification.getKind());
}
}
})
.lift(new DisableBackPressureOperator<RemoteRxEvent>())
.buffer(writeBufferTimeMSec, TimeUnit.MILLISECONDS)
.filter(new Func1<List<RemoteRxEvent>, Boolean>() {
@Override
public Boolean call(List<RemoteRxEvent> t1) {
return t1 != null && !t1.isEmpty();
}
})
.filter(new Func1<List<RemoteRxEvent>, Boolean>() {
@Override
public Boolean call(List<RemoteRxEvent> t1) {
return t1 != null && !t1.isEmpty();
}
})
.subscribe(new WriteBytesObserver(connection, subReference, serverMetrics,
serveConfig.getSlottingStrategy(), endpoint)));
return subReference.getValue();
}