public void messageReceived()

in server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java [54:339]


    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        logger.info("message receives in session handler...");
        long start = System.nanoTime();
        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
        Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
        ClientIdentity clientIdentity = null;
        try {
            switch (packet.getType()) {
                case SUBSCRIPTION:
                    Sub sub = Sub.parseFrom(packet.getBody());
                    if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) {
                        clientIdentity = new ClientIdentity(sub.getDestination(),
                            Short.valueOf(sub.getClientId()),
                            sub.getFilter());
                        MDC.put("destination", clientIdentity.getDestination());

                        // 尝试启动,如果已经启动,忽略
                        if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                            if (!runningMonitor.isStart()) {
                                runningMonitor.start();
                            }
                        }

                        embeddedServer.subscribe(clientIdentity);
                        // ctx.setAttachment(clientIdentity);// 设置状态数据
                        byte[] ackBytes = NettyUtils.ackPacket();
                        NettyUtils.write(ctx.getChannel(), ackBytes, new ChannelFutureAggregator(sub.getDestination(),
                            sub,
                            packet.getType(),
                            ackBytes.length,
                            System.nanoTime() - start));
                    } else {
                        byte[] errorBytes = NettyUtils.errorPacket(401,
                            MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage());
                        NettyUtils.write(ctx.getChannel(),
                            errorBytes,
                            new ChannelFutureAggregator(sub.getDestination(),
                                sub,
                                packet.getType(),
                                errorBytes.length,
                                System.nanoTime() - start,
                                (short) 401));
                    }
                    break;
                case UNSUBSCRIPTION:
                    Unsub unsub = Unsub.parseFrom(packet.getBody());
                    if (StringUtils.isNotEmpty(unsub.getDestination()) && StringUtils.isNotEmpty(unsub.getClientId())) {
                        clientIdentity = new ClientIdentity(unsub.getDestination(),
                            Short.valueOf(unsub.getClientId()),
                            unsub.getFilter());
                        MDC.put("destination", clientIdentity.getDestination());
                        embeddedServer.unsubscribe(clientIdentity);
                        stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
                        byte[] ackBytes = NettyUtils.ackPacket();
                        NettyUtils.write(ctx.getChannel(),
                            ackBytes,
                            new ChannelFutureAggregator(unsub.getDestination(),
                                unsub,
                                packet.getType(),
                                ackBytes.length,
                                System.nanoTime() - start));
                    } else {
                        byte[] errorBytes = NettyUtils.errorPacket(401,
                            MessageFormatter.format("destination or clientId is null", unsub.toString()).getMessage());
                        NettyUtils.write(ctx.getChannel(),
                            errorBytes,
                            new ChannelFutureAggregator(unsub.getDestination(),
                                unsub,
                                packet.getType(),
                                errorBytes.length,
                                System.nanoTime() - start,
                                (short) 401));
                    }
                    break;
                case GET:
                    Get get = CanalPacket.Get.parseFrom(packet.getBody());
                    if (StringUtils.isNotEmpty(get.getDestination()) && StringUtils.isNotEmpty(get.getClientId())) {
                        clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId()));
                        MDC.put("destination", clientIdentity.getDestination());
                        Message message = null;

                        // if (get.getAutoAck()) {
                        // if (get.getTimeout() == -1) {//是否是初始值
                        // message = embeddedServer.get(clientIdentity,
                        // get.getFetchSize());
                        // } else {
                        // TimeUnit unit = convertTimeUnit(get.getUnit());
                        // message = embeddedServer.get(clientIdentity,
                        // get.getFetchSize(), get.getTimeout(), unit);
                        // }
                        // } else {
                        if (get.getTimeout() == -1) {// 是否是初始值
                            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
                        } else {
                            TimeUnit unit = convertTimeUnit(get.getUnit());
                            message = embeddedServer.getWithoutAck(clientIdentity,
                                get.getFetchSize(),
                                get.getTimeout(),
                                unit);
                        }
                        // }

                        if (message.getId() != -1 && message.isRaw()) {
                            List<ByteString> rowEntries = message.getRawEntries();
                            // message size
                            int messageSize = 0;
                            messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId());

                            int dataSize = 0;
                            for (ByteString rowEntry : rowEntries) {
                                dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntry);
                            }
                            messageSize += dataSize;
                            messageSize += 1 * rowEntries.size();
                            // packet size
                            int size = 0;
                            size += com.google.protobuf.CodedOutputStream.computeEnumSize(3,
                                PacketType.MESSAGES.getNumber());
                            size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
                                    + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize)
                                    + messageSize;
                            // recyle bytes
                            // ByteBuffer byteBuffer = (ByteBuffer)
                            // ctx.getAttachment();
                            // if (byteBuffer != null && size <=
                            // byteBuffer.capacity()) {
                            // byteBuffer.clear();
                            // } else {
                            // byteBuffer =
                            // ByteBuffer.allocate(size).order(ByteOrder.BIG_ENDIAN);
                            // ctx.setAttachment(byteBuffer);
                            // }
                            // CodedOutputStream output =
                            // CodedOutputStream.newInstance(byteBuffer);
                            byte[] body = new byte[size];
                            CodedOutputStream output = CodedOutputStream.newInstance(body);
                            output.writeEnum(3, PacketType.MESSAGES.getNumber());

                            output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
                            output.writeRawVarint32(messageSize);
                            // message
                            output.writeInt64(1, message.getId());
                            for (ByteString rowEntry : rowEntries) {
                                output.writeBytes(2, rowEntry);
                            }
                            output.checkNoSpaceLeft();
                            NettyUtils.write(ctx.getChannel(), body, new ChannelFutureAggregator(get.getDestination(),
                                get,
                                packet.getType(),
                                body.length,
                                System.nanoTime() - start,
                                message.getId() == -1));

                            // output.flush();
                            // byteBuffer.flip();
                            // NettyUtils.write(ctx.getChannel(), byteBuffer,
                            // null);
                        } else {
                            Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
                            packetBuilder.setType(PacketType.MESSAGES).setVersion(NettyUtils.VERSION);

                            Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                            messageBuilder.setBatchId(message.getId());
                            if (message.getId() != -1) {
                                if (message.isRaw() && !CollectionUtils.isEmpty(message.getRawEntries())) {
                                    messageBuilder.addAllMessages(message.getRawEntries());
                                } else if (!CollectionUtils.isEmpty(message.getEntries())) {
                                    for (Entry entry : message.getEntries()) {
                                        messageBuilder.addMessages(entry.toByteString());
                                    }
                                }
                            }
                            byte[] body = packetBuilder.setBody(messageBuilder.build().toByteString())
                                .build()
                                .toByteArray();
                            NettyUtils.write(ctx.getChannel(), body, new ChannelFutureAggregator(get.getDestination(),
                                get,
                                packet.getType(),
                                body.length,
                                System.nanoTime() - start,
                                message.getId() == -1));// 输出数据
                        }
                    } else {
                        byte[] errorBytes = NettyUtils.errorPacket(401,
                            MessageFormatter.format("destination or clientId is null", get.toString()).getMessage());
                        NettyUtils.write(ctx.getChannel(),
                            errorBytes,
                            new ChannelFutureAggregator(get.getDestination(),
                                get,
                                packet.getType(),
                                errorBytes.length,
                                System.nanoTime() - start,
                                (short) 401));
                    }
                    break;
                case CLIENTACK:
                    ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
                    MDC.put("destination", ack.getDestination());
                    if (StringUtils.isNotEmpty(ack.getDestination()) && StringUtils.isNotEmpty(ack.getClientId())) {
                        if (ack.getBatchId() == 0L) {
                            byte[] errorBytes = NettyUtils.errorPacket(402,
                                MessageFormatter.format("batchId should assign value", ack.toString()).getMessage());
                            NettyUtils.write(ctx.getChannel(),
                                errorBytes,
                                new ChannelFutureAggregator(ack.getDestination(),
                                    ack,
                                    packet.getType(),
                                    errorBytes.length,
                                    System.nanoTime() - start,
                                    (short) 402));
                        } else if (ack.getBatchId() == -1L) { // -1代表上一次get没有数据,直接忽略之
                            // donothing
                        } else {
                            clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
                            embeddedServer.ack(clientIdentity, ack.getBatchId());
                            new ChannelFutureAggregator(ack.getDestination(),
                                ack,
                                packet.getType(),
                                0,
                                System.nanoTime() - start).operationComplete(null);
                        }
                    } else {
                        byte[] errorBytes = NettyUtils.errorPacket(401,
                            MessageFormatter.format("destination or clientId is null", ack.toString()).getMessage());
                        NettyUtils.write(ctx.getChannel(),
                            errorBytes,
                            new ChannelFutureAggregator(ack.getDestination(),
                                ack,
                                packet.getType(),
                                errorBytes.length,
                                System.nanoTime() - start,
                                (short) 401));
                    }
                    break;
                case CLIENTROLLBACK:
                    ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
                    MDC.put("destination", rollback.getDestination());
                    if (StringUtils.isNotEmpty(rollback.getDestination())
                        && StringUtils.isNotEmpty(rollback.getClientId())) {
                        clientIdentity = new ClientIdentity(rollback.getDestination(),
                            Short.valueOf(rollback.getClientId()));
                        if (rollback.getBatchId() == 0L) {
                            embeddedServer.rollback(clientIdentity);// 回滚所有批次
                        } else {
                            embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
                        }
                        new ChannelFutureAggregator(rollback.getDestination(),
                            rollback,
                            packet.getType(),
                            0,
                            System.nanoTime() - start).operationComplete(null);
                    } else {
                        byte[] errorBytes = NettyUtils.errorPacket(401,
                            MessageFormatter.format("destination or clientId is null", rollback.toString())
                                .getMessage());
                        NettyUtils.write(ctx.getChannel(),
                            errorBytes,
                            new ChannelFutureAggregator(rollback.getDestination(),
                                rollback,
                                packet.getType(),
                                errorBytes.length,
                                System.nanoTime() - start,
                                (short) 401));
                    }
                    break;
                default:
                    byte[] errorBytes = NettyUtils.errorPacket(400,
                        MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
                    NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel()
                        .getRemoteAddress()
                        .toString(), null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
                    break;
            }
        } catch (Throwable exception) {
            byte[] errorBytes = NettyUtils.errorPacket(400,
                MessageFormatter.format("something goes wrong with channel:{}, exception={}",
                    ctx.getChannel(),
                    ExceptionUtils.getStackTrace(exception)).getMessage());
            NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel()
                .getRemoteAddress()
                .toString(), null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
        } finally {
            MDC.remove("destination");
        }
    }