in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/BatchedRxEventPipelineConfigurator.java [91:208]
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
boolean handled = false;
if (ByteBuf.class.isAssignableFrom(msg.getClass())) {
ByteBuf byteBuf = (ByteBuf) msg;
if (byteBuf.isReadable()) {
int protocolVersion = byteBuf.readByte();
if (protocolVersion != PROTOCOL_VERSION) {
throw new RuntimeException("Unsupported protocol version: " + protocolVersion);
}
int observableNameLength = byteBuf.readByte();
String observableName = null;
if (observableNameLength > 0) {
// read name
byte[] observableNameBytes = new byte[observableNameLength];
byteBuf.readBytes(observableNameBytes);
observableName = new String(observableNameBytes, Charset.forName("UTF-8"));
}
while (byteBuf.isReadable()) {
int lengthOfEvent = byteBuf.readInt();
int operation = byteBuf.readByte();
RemoteRxEvent.Type type = null;
Map<String, String> subscribeParams = null;
byte[] valueData = null;
if (operation == 1) {
// if(logger.isDebugEnabled()) {
// logger.debug("READ request for RemoteRxEvent: next");
// }
type = RemoteRxEvent.Type.next;
valueData = new byte[lengthOfEvent - 1]; //subtract op code
byteBuf.readBytes(valueData);
} else if (operation == 2) {
// if(logger.isDebugEnabled()) {
// logger.debug("READ request for RemoteRxEvent: error");
// }
type = RemoteRxEvent.Type.error;
valueData = new byte[lengthOfEvent - 1];
byteBuf.readBytes(valueData);
} else if (operation == 3) {
// if(logger.isDebugEnabled()) {
// logger.debug("READ request for RemoteRxEvent: completed");
// }
type = RemoteRxEvent.Type.completed;
} else if (operation == 4) {
// if(logger.isDebugEnabled()) {
// logger.debug("READ request for RemoteRxEvent: subscribed");
// }
type = RemoteRxEvent.Type.subscribed;
// read subscribe parameters
int subscribeParamsLength = byteBuf.readInt();
if (subscribeParamsLength > 0) {
// read byte into map
byte[] subscribeParamsBytes = new byte[subscribeParamsLength];
byteBuf.readBytes(subscribeParamsBytes);
subscribeParams = fromBytesToMap(subscribeParamsBytes);
}
} else if (operation == 5) {
// if(logger.isDebugEnabled()) {
// logger.debug("READ request for RemoteRxEvent: unsubscribed");
// }
type = RemoteRxEvent.Type.unsubscribed;
} else if (operation == 6) {
// if(logger.isDebugEnabled()) {
// logger.debug("READ request for RemoteRxEvent: heartbeat");
// }
} else if (operation == 7) {
// if(logger.isDebugEnabled()) {
// logger.debug("READ request for RemoteRxEvent: nonDataError");
// }
type = RemoteRxEvent.Type.nonDataError;
valueData = new byte[lengthOfEvent - 1];
byteBuf.readBytes(valueData);
} else {
throw new RuntimeException("operation: " + operation + " not support.");
}
// don't send heartbeats through pipeline
if (operation != 6) {
ctx.fireChannelRead(new RemoteRxEvent(observableName, type, valueData, subscribeParams));
}
}
handled = true;
byteBuf.release();
}
}
if (!handled) {
super.channelRead(ctx, msg);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
if (msg instanceof List) {
@SuppressWarnings("unchecked")
List<RemoteRxEvent> batch = (List<RemoteRxEvent>) msg;
ByteBuf buf = ctx.alloc().buffer();
writeHeader(buf, batch.get(0).getName());
for (RemoteRxEvent event : batch) {
writeBytesIntoBuf(event, buf);
}
super.write(ctx, buf, promise);
super.flush(ctx);
} else {
super.write(ctx, msg, promise);
}
}
});
}