in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservableConnectionHandler.java [80:145]
private <T> Subscription serveObservable(
final Observable<T> observable,
final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection,
final RemoteRxEvent event,
final Func1<Map<String, String>, Func1<T, Boolean>> filterFunction,
final Encoder<T> encoder,
final ServeObservable<T> serveConfig,
final WritableEndpoint<Observable<T>> endpoint) {
final MutableReference<Subscription> subReference = new MutableReference<>();
subReference.setValue(
observable
.filter(filterFunction.call(event.getSubscribeParameters()))
.doOnCompleted(new Action0() {
@Override
public void call() {
logger.info("OnCompleted recieved in serveObservable, sending to client.");
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable t1) {
logger.info("OnError received in serveObservable, sending to client: ", t1);
}
})
.materialize()
.lift(new DisableBackPressureOperator<Notification<T>>())
.buffer(writeBufferTimeMSec, TimeUnit.MILLISECONDS)
.filter(new Func1<List<Notification<T>>, Boolean>() {
@Override
public Boolean call(List<Notification<T>> t1) {
return t1 != null && !t1.isEmpty();
}
})
.map(new Func1<List<Notification<T>>, List<RemoteRxEvent>>() {
@Override
public List<RemoteRxEvent> call(List<Notification<T>> notifications) {
List<RemoteRxEvent> rxEvents = new ArrayList<RemoteRxEvent>(notifications.size());
for (Notification<T> notification : notifications) {
if (notification.getKind() == Notification.Kind.OnNext) {
rxEvents.add(RemoteRxEvent.next(event.getName(), encoder.encode(notification.getValue())));
} else if (notification.getKind() == Notification.Kind.OnError) {
rxEvents.add(RemoteRxEvent.error(event.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable())));
} else if (notification.getKind() == Notification.Kind.OnCompleted) {
rxEvents.add(RemoteRxEvent.completed(event.getName()));
} else {
throw new RuntimeException("Unsupported notification kind: " + notification.getKind());
}
}
return rxEvents;
}
})
.filter(new Func1<List<RemoteRxEvent>, Boolean>() {
@Override
public Boolean call(List<RemoteRxEvent> t1) {
return t1 != null && !t1.isEmpty();
}
})
.subscribe(new WriteBytesObserver(connection, subReference, serverMetrics,
serveConfig.getSlottingStrategy(), endpoint)));
return subReference.getValue();
}