in mantis-network/src/main/java/io/reactivex/mantis/network/push/LegacyTcpPipelineConfigurator.java [97:221]
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
boolean handled = false;
if (ByteBuf.class.isAssignableFrom(msg.getClass())) {
ByteBuf byteBuf = (ByteBuf) msg;
if (byteBuf.isReadable()) {
int protocolVersion = byteBuf.readByte();
if (protocolVersion != PROTOCOL_VERSION) {
throw new RuntimeException("Unsupported protocol version: " + protocolVersion);
}
int observableNameLength = byteBuf.readByte();
String observableName = null;
if (observableNameLength > 0) {
// read name
byte[] observableNameBytes = new byte[observableNameLength];
byteBuf.readBytes(observableNameBytes);
observableName = new String(observableNameBytes, Charset.forName("UTF-8"));
}
while (byteBuf.isReadable()) {
int lengthOfEvent = byteBuf.readInt();
int operation = byteBuf.readByte();
RemoteRxEvent.Type type = null;
Map<String, String> subscribeParams = null;
byte[] valueData = null;
if (operation == 1) {
if (logger.isDebugEnabled()) {
logger.debug("READ request for RemoteRxEvent: next");
}
type = RemoteRxEvent.Type.next;
valueData = new byte[lengthOfEvent - 1]; //subtract op code
byteBuf.readBytes(valueData);
} else if (operation == 2) {
if (logger.isDebugEnabled()) {
logger.debug("READ request for RemoteRxEvent: error");
}
type = RemoteRxEvent.Type.error;
valueData = new byte[lengthOfEvent - 1];
byteBuf.readBytes(valueData);
} else if (operation == 3) {
if (logger.isDebugEnabled()) {
logger.debug("READ request for RemoteRxEvent: completed");
}
type = RemoteRxEvent.Type.completed;
} else if (operation == 4) {
if (logger.isDebugEnabled()) {
logger.debug("READ request for RemoteRxEvent: subscribed");
}
type = RemoteRxEvent.Type.subscribed;
// read subscribe parameters
int subscribeParamsLength = byteBuf.readInt();
if (subscribeParamsLength > 0) {
// read byte into map
byte[] subscribeParamsBytes = new byte[subscribeParamsLength];
byteBuf.readBytes(subscribeParamsBytes);
subscribeParams = fromBytesToMap(subscribeParamsBytes);
}
} else if (operation == 5) {
if (logger.isDebugEnabled()) {
logger.debug("READ request for RemoteRxEvent: unsubscribed");
}
type = RemoteRxEvent.Type.unsubscribed;
} else if (operation == 6) {
if (logger.isDebugEnabled()) {
logger.debug("READ request for RemoteRxEvent: heartbeat");
}
} else if (operation == 7) {
if (logger.isDebugEnabled()) {
logger.debug("READ request for RemoteRxEvent: nonDataError");
}
type = RemoteRxEvent.Type.nonDataError;
valueData = new byte[lengthOfEvent - 1];
byteBuf.readBytes(valueData);
} else {
throw new RuntimeException("operation: " + operation + " not support.");
}
// don't send heartbeats through pipeline
if (operation != 6) {
ctx.fireChannelRead(new RemoteRxEvent(observableName, type, valueData, subscribeParams));
}
}
handled = true;
byteBuf.release();
}
}
if (!handled) {
super.channelRead(ctx, msg);
}
}
});
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
if (ByteBuf.class.isAssignableFrom(msg.getClass())) {
// handle data writes
ByteBuf bytes = (ByteBuf) msg;
ByteBuf buf = ctx.alloc().buffer(bytes.readableBytes());
writeHeader(buf, name);
buf.writeBytes(bytes);
bytes.release();
super.write(ctx, buf, promise);
} else if (msg instanceof byte[]) {
// handle heart beat writes
ByteBuf buf = ctx.alloc().buffer();
writeHeader(buf, name);
buf.writeBytes((byte[]) msg);
super.write(ctx, buf, promise);
super.flush(ctx);
} else {
super.write(ctx, msg, promise);
}
}
});
}