public void configureNewPipeline()

in mantis-network/src/main/java/io/reactivex/mantis/network/push/LegacyTcpPipelineConfigurator.java [97:221]


    public void configureNewPipeline(ChannelPipeline pipeline) {

        pipeline.addLast(new ChannelDuplexHandler() {

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg)
                    throws Exception {
                boolean handled = false;
                if (ByteBuf.class.isAssignableFrom(msg.getClass())) {


                    ByteBuf byteBuf = (ByteBuf) msg;
                    if (byteBuf.isReadable()) {
                        int protocolVersion = byteBuf.readByte();
                        if (protocolVersion != PROTOCOL_VERSION) {
                            throw new RuntimeException("Unsupported protocol version: " + protocolVersion);
                        }
                        int observableNameLength = byteBuf.readByte();
                        String observableName = null;
                        if (observableNameLength > 0) {
                            // read name
                            byte[] observableNameBytes = new byte[observableNameLength];
                            byteBuf.readBytes(observableNameBytes);
                            observableName = new String(observableNameBytes, Charset.forName("UTF-8"));
                        }

                        while (byteBuf.isReadable()) {
                            int lengthOfEvent = byteBuf.readInt();
                            int operation = byteBuf.readByte();
                            RemoteRxEvent.Type type = null;
                            Map<String, String> subscribeParams = null;
                            byte[] valueData = null;
                            if (operation == 1) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("READ request for RemoteRxEvent: next");
                                }
                                type = RemoteRxEvent.Type.next;
                                valueData = new byte[lengthOfEvent - 1]; //subtract op code
                                byteBuf.readBytes(valueData);
                            } else if (operation == 2) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("READ request for RemoteRxEvent: error");
                                }
                                type = RemoteRxEvent.Type.error;
                                valueData = new byte[lengthOfEvent - 1];
                                byteBuf.readBytes(valueData);
                            } else if (operation == 3) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("READ request for RemoteRxEvent: completed");
                                }
                                type = RemoteRxEvent.Type.completed;
                            } else if (operation == 4) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("READ request for RemoteRxEvent: subscribed");
                                }
                                type = RemoteRxEvent.Type.subscribed;
                                // read subscribe parameters
                                int subscribeParamsLength = byteBuf.readInt();
                                if (subscribeParamsLength > 0) {
                                    // read byte into map
                                    byte[] subscribeParamsBytes = new byte[subscribeParamsLength];
                                    byteBuf.readBytes(subscribeParamsBytes);
                                    subscribeParams = fromBytesToMap(subscribeParamsBytes);
                                }
                            } else if (operation == 5) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("READ request for RemoteRxEvent: unsubscribed");
                                }
                                type = RemoteRxEvent.Type.unsubscribed;
                            } else if (operation == 6) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("READ request for RemoteRxEvent: heartbeat");
                                }
                            } else if (operation == 7) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("READ request for RemoteRxEvent: nonDataError");
                                }
                                type = RemoteRxEvent.Type.nonDataError;
                                valueData = new byte[lengthOfEvent - 1];
                                byteBuf.readBytes(valueData);
                            } else {
                                throw new RuntimeException("operation: " + operation + " not support.");
                            }
                            // don't send heartbeats through pipeline
                            if (operation != 6) {
                                ctx.fireChannelRead(new RemoteRxEvent(observableName, type, valueData, subscribeParams));
                            }
                        }
                        handled = true;
                        byteBuf.release();
                    }
                }
                if (!handled) {
                    super.channelRead(ctx, msg);
                }

            }
        });

        pipeline.addLast(new ChannelDuplexHandler() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg,
                              ChannelPromise promise) throws Exception {

                if (ByteBuf.class.isAssignableFrom(msg.getClass())) {
                    // handle data writes
                    ByteBuf bytes = (ByteBuf) msg;
                    ByteBuf buf = ctx.alloc().buffer(bytes.readableBytes());
                    writeHeader(buf, name);
                    buf.writeBytes(bytes);
                    bytes.release();
                    super.write(ctx, buf, promise);
                } else if (msg instanceof byte[]) {
                    // handle heart beat writes
                    ByteBuf buf = ctx.alloc().buffer();
                    writeHeader(buf, name);
                    buf.writeBytes((byte[]) msg);
                    super.write(ctx, buf, promise);
                    super.flush(ctx);
                } else {
                    super.write(ctx, msg, promise);
                }
            }
        });
    }