in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservableConnectionHandler.java [213:314]
private <K, V> Subscription serveGroupedObservable(
final Observable<Group<K, V>> groups,
final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection,
final RemoteRxEvent event,
final Func1<Map<String, String>, Func1<K, Boolean>> filterFunction,
final Encoder<K> keyEncoder,
final Encoder<V> valueEncoder,
final ServeGroupedObservable<K, V> serveConfig,
final WritableEndpoint<GroupedObservable<K, V>> endpoint) {
final MutableReference<Subscription> subReference = new MutableReference<>();
subReference.setValue(groups
// filter out groups based on subscription parameters
.filter(new Func1<Group<K, V>, Boolean>() {
@Override
public Boolean call(Group<K, V> group) {
return filterFunction.call(event.getSubscribeParameters())
.call(group.getKeyValue());
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
logger.info("OnCompleted recieved in serveGroupedObservable, sending to client.");
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable t1) {
logger.info("OnError received in serveGroupedObservable, sending to client: ", t1);
}
})
.materialize()
.lift(new DisableBackPressureOperator<Notification<Group<K, V>>>())
.buffer(writeBufferTimeMSec, TimeUnit.MILLISECONDS)
.filter(new Func1<List<Notification<Group<K, V>>>, Boolean>() {
@Override
public Boolean call(List<Notification<Group<K, V>>> t1) {
return t1 != null && !t1.isEmpty();
}
})
.map(new Func1<List<Notification<Group<K, V>>>, List<RemoteRxEvent>>() {
@Override
public List<RemoteRxEvent> call(final List<Notification<Group<K, V>>> groupNotifications) {
List<RemoteRxEvent> rxEvents = new ArrayList<RemoteRxEvent>(groupNotifications.size());
for (Notification<Group<K, V>> groupNotification : groupNotifications) {
if (Kind.OnNext == groupNotification.getKind()) {
// encode inner group notification
Group<K, V> group = groupNotification.getValue();
final int keyLength = group.getKeyBytes().length;
Notification<V> notification = groupNotification.getValue().getNotification();
byte[] data = null;
if (Kind.OnNext == notification.getKind()) {
V value = notification.getValue();
byte[] valueBytes = valueEncoder.encode(value);
// 1 byte for notification type,
// 4 bytes is to encode key length as int
data = ByteBuffer.allocate(1 + 4 + keyLength + valueBytes.length)
.put((byte) 1)
.putInt(keyLength)
.put(group.getKeyBytes())
.put(valueBytes)
.array();
} else if (Kind.OnCompleted == notification.getKind()) {
data = ByteBuffer.allocate(1 + 4 + keyLength)
.put((byte) 2)
.putInt(keyLength)
.put(group.getKeyBytes())
.array();
} else if (Kind.OnError == notification.getKind()) {
Throwable error = notification.getThrowable();
byte[] errorBytes = RemoteObservable.fromThrowableToBytes(error);
data = ByteBuffer.allocate(1 + 4 + keyLength + errorBytes.length)
.put((byte) 3)
.putInt(keyLength)
.put(group.getKeyBytes())
.put(errorBytes)
.array();
}
rxEvents.add(RemoteRxEvent.next(event.getName(), data));
} else if (Kind.OnCompleted == groupNotification.getKind()) {
rxEvents.add(RemoteRxEvent.completed(event.getName()));
} else if (Kind.OnError == groupNotification.getKind()) {
rxEvents.add(RemoteRxEvent.error(event.getName(),
RemoteObservable.fromThrowableToBytes(groupNotification.getThrowable())));
} else {
throw new RuntimeException("Unsupported notification type: " + groupNotification.getKind());
}
}
return rxEvents;
}
})
.filter(new Func1<List<RemoteRxEvent>, Boolean>() {
@Override
public Boolean call(List<RemoteRxEvent> t1) {
return t1 != null && !t1.isEmpty();
}
})
.subscribe(new WriteBytesObserver(connection, subReference, serverMetrics,
serveConfig.getSlottingStrategy(), endpoint)));
return subReference.getValue();
}