mantis-network/src/main/java/io/reactivex/mantis/network/push/LegacyTcpPipelineConfigurator.java [50:129]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @SuppressWarnings("unchecked")
    static Map<String, String> fromBytesToMap(byte[] bytes) {
        Map<String, String> map = null;
        ByteArrayInputStream bis = null;
        ObjectInput in = null;
        try {
            bis = new ByteArrayInputStream(bytes);
            in = new ObjectInputStream(bis);
            map = (Map<String, String>) in.readObject();

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e1) {
            throw new RuntimeException(e1);
        } finally {
            try {
                if (bis != null) {bis.close();}
                if (in != null) {in.close();}
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return map;
    }

    static byte[] fromMapToBytes(Map<String, String> map) {
        ByteArrayOutputStream baos = null;
        ObjectOutput out = null;
        try {
            baos = new ByteArrayOutputStream();
            out = new ObjectOutputStream(baos);
            out.writeObject(map);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (out != null) {out.close();}
                if (baos != null) {baos.close();}
            } catch (IOException e1) {
                e1.printStackTrace();
                throw new RuntimeException(e1);
            }
        }
        return baos.toByteArray();
    }

    @Override
    public void configureNewPipeline(ChannelPipeline pipeline) {

        pipeline.addLast(new ChannelDuplexHandler() {

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg)
                    throws Exception {
                boolean handled = false;
                if (ByteBuf.class.isAssignableFrom(msg.getClass())) {


                    ByteBuf byteBuf = (ByteBuf) msg;
                    if (byteBuf.isReadable()) {
                        int protocolVersion = byteBuf.readByte();
                        if (protocolVersion != PROTOCOL_VERSION) {
                            throw new RuntimeException("Unsupported protocol version: " + protocolVersion);
                        }
                        int observableNameLength = byteBuf.readByte();
                        String observableName = null;
                        if (observableNameLength > 0) {
                            // read name
                            byte[] observableNameBytes = new byte[observableNameLength];
                            byteBuf.readBytes(observableNameBytes);
                            observableName = new String(observableNameBytes, Charset.forName("UTF-8"));
                        }

                        while (byteBuf.isReadable()) {
                            int lengthOfEvent = byteBuf.readInt();
                            int operation = byteBuf.readByte();
                            RemoteRxEvent.Type type = null;
                            Map<String, String> subscribeParams = null;
                            byte[] valueData = null;
                            if (operation == 1) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/BatchedRxEventPipelineConfigurator.java [44:124]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @SuppressWarnings("unchecked")
    static Map<String, String> fromBytesToMap(byte[] bytes) {
        Map<String, String> map = null;
        ByteArrayInputStream bis = null;
        ObjectInput in = null;
        try {
            bis = new ByteArrayInputStream(bytes);
            in = new ObjectInputStream(bis);
            map = (Map<String, String>) in.readObject();

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e1) {
            throw new RuntimeException(e1);
        } finally {
            try {
                if (bis != null) {bis.close();}
                if (in != null) {in.close();}
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return map;
    }

    static byte[] fromMapToBytes(Map<String, String> map) {
        ByteArrayOutputStream baos = null;
        ObjectOutput out = null;
        try {
            baos = new ByteArrayOutputStream();
            out = new ObjectOutputStream(baos);
            out.writeObject(map);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (out != null) {out.close();}
                if (baos != null) {baos.close();}
            } catch (IOException e1) {
                e1.printStackTrace();
                throw new RuntimeException(e1);
            }
        }
        return baos.toByteArray();
    }

    @Override
    public void configureNewPipeline(ChannelPipeline pipeline) {


        pipeline.addLast(new ChannelDuplexHandler() {

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg)
                    throws Exception {
                boolean handled = false;
                if (ByteBuf.class.isAssignableFrom(msg.getClass())) {


                    ByteBuf byteBuf = (ByteBuf) msg;
                    if (byteBuf.isReadable()) {
                        int protocolVersion = byteBuf.readByte();
                        if (protocolVersion != PROTOCOL_VERSION) {
                            throw new RuntimeException("Unsupported protocol version: " + protocolVersion);
                        }
                        int observableNameLength = byteBuf.readByte();
                        String observableName = null;
                        if (observableNameLength > 0) {
                            // read name
                            byte[] observableNameBytes = new byte[observableNameLength];
                            byteBuf.readBytes(observableNameBytes);
                            observableName = new String(observableNameBytes, Charset.forName("UTF-8"));
                        }

                        while (byteBuf.isReadable()) {
                            int lengthOfEvent = byteBuf.readInt();
                            int operation = byteBuf.readByte();
                            RemoteRxEvent.Type type = null;
                            Map<String, String> subscribeParams = null;
                            byte[] valueData = null;
                            if (operation == 1) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



