private void negotiate()

in driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java [182:337]


    private void negotiate(SocketChannel channel) throws IOException {
        // https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol
        HeaderPacket header = PacketManager.readHeader(channel, 4, timeout);
        byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
        if (body[0] < 0) {// check field_count
            if (body[0] == -1) {
                ErrorPacket error = new ErrorPacket();
                error.fromBytes(body);
                throw new IOException("handshake exception:\n" + error.toString());
            } else if (body[0] == -2) {
                throw new IOException("Unexpected EOF packet at handshake phase.");
            } else {
                throw new IOException("Unexpected packet with field_count=" + body[0]);
            }
        }
        HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
        handshakePacket.fromBytes(body);
        // default utf8(33)
        byte serverCharsetNumber = (handshakePacket.serverCharsetNumber != 0) ? handshakePacket.serverCharsetNumber : 33;
        SslMode sslMode = sslInfo != null ? sslInfo.getSslMode() : SslMode.DISABLED;
        if (sslMode != SslMode.DISABLED) {
            boolean serverSupportSsl = (handshakePacket.serverCapabilities & CLIENT_SSL) > 0;
            if (!serverSupportSsl) {
                throw new IOException("MySQL Server does not support SSL: " + address + " serverCapabilities: "
                                      + handshakePacket.serverCapabilities);
            }
            byte[] sslPacket = new SslRequestCommandPacket(serverCharsetNumber).toBytes();
            HeaderPacket sslHeader = new HeaderPacket();
            sslHeader.setPacketBodyLength(sslPacket.length);
            sslHeader.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
            header.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
            PacketManager.writePkg(channel, sslHeader.toBytes(), sslPacket);
            channel = SocketChannelPool.connectSsl(channel, sslInfo);
            this.channel = channel;
        }
        if (handshakePacket.protocolVersion != MSC.DEFAULT_PROTOCOL_VERSION) {
            // HandshakeV9
            auth323(channel, (byte) (header.getPacketSequenceNumber() + 1), handshakePacket.seed);
            return;
        }

        connectionId = handshakePacket.threadId; // 记录一下connection
        serverVersion = handshakePacket.serverVersion; // 记录serverVersion
        logger.info("handshake initialization packet received, prepare the client authentication packet to send");
        // 某些老协议的 server 默认不返回 auth plugin,需要使用默认的 mysql_native_password
        String authPluginName = "mysql_native_password";
        if (handshakePacket.authPluginName != null && handshakePacket.authPluginName.length > 0) {
            authPluginName = new String(handshakePacket.authPluginName);
        }
        logger.info("auth plugin: {}", authPluginName);
        boolean isSha2Password = false;
        ClientAuthenticationPacket clientAuth;
        if ("caching_sha2_password".equals(authPluginName)) {
            clientAuth = new ClientAuthenticationSHA2Packet();
            isSha2Password = true;
        } else {
            clientAuth = new ClientAuthenticationPacket();
        }
        clientAuth.setCharsetNumber(serverCharsetNumber);

        clientAuth.setUsername(username);
        clientAuth.setPassword(password);
        clientAuth.setServerCapabilities(handshakePacket.serverCapabilities);
        clientAuth.setDatabaseName(defaultSchema);
        clientAuth.setScrumbleBuff(joinAndCreateScrumbleBuff(handshakePacket));
        clientAuth.setAuthPluginName(authPluginName.getBytes());

        byte[] clientAuthPkgBody = clientAuth.toBytes();
        HeaderPacket h = new HeaderPacket();
        h.setPacketBodyLength(clientAuthPkgBody.length);
        h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));

        PacketManager.writePkg(channel, h.toBytes(), clientAuthPkgBody);
        logger.info("client authentication packet is sent out.");

        // check auth result
        header = PacketManager.readHeader(channel, 4);
        body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
        assert body != null;
        byte marker = body[0];
        if (marker == -2 || marker == 1) {
            if (isSha2Password && body[1] == 3) {
                // sha2 auth ok
                logger.info("caching_sha2_password auth success.");
                header = PacketManager.readHeader(channel, 4);
                body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
            } else {
                byte[] authData = null;
                String pluginName = authPluginName;
                if (marker == 1) {
                    AuthSwitchRequestMoreData packet = new AuthSwitchRequestMoreData();
                    packet.fromBytes(body);
                    authData = packet.authData;
                } else {
                    AuthSwitchRequestPacket packet = new AuthSwitchRequestPacket();
                    packet.fromBytes(body);
                    authData = packet.authData;
                    pluginName = packet.authName;
                    logger.info("auth switch pluginName is {}.", pluginName);
                }

                byte[] encryptedPassword = null;
                if ("mysql_clear_password".equals(pluginName)) {
                    encryptedPassword = getPassword().getBytes();
                    header = authSwitchAfterAuth(encryptedPassword, header);
                    body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
                } else if (pluginName == null || "mysql_native_password".equals(pluginName)) {
                    try {
                        encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
                    } catch (NoSuchAlgorithmException e) {
                        throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
                    }
                    header = authSwitchAfterAuth(encryptedPassword, header);
                    body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
                } else if ("caching_sha2_password".equals(pluginName)) {
                    if (body[0] == 0x01 && body[1] == 0x04) {
                        // support full auth
                        // clientAuth提前采用了sha2编码,会减少一次auth交互
                        header = cachingSha2PasswordFullAuth(channel,
                            header,
                            getPassword().getBytes(),
                            clientAuth.getScrumbleBuff());
                        body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
                    } else {
                        byte[] scramble = authData;
                        try {
                            encryptedPassword = MySQLPasswordEncrypter.scrambleCachingSha2(getPassword().getBytes(),
                                    scramble);
                        } catch (DigestException e) {
                            throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
                        }
                        header = authSwitchAfterAuth(encryptedPassword, header);
                        body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
                        assert body != null;
                        if (body[0] == 0x01 && body[1] == 0x04) {
                            // fixed issue https://github.com/alibaba/canal/pull/4767, support mysql 8.0.30+
                            header = cachingSha2PasswordFullAuth(channel, header, getPassword().getBytes(), scramble);
                            body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
                        }
                    }
                } else {
                    header = authSwitchAfterAuth(encryptedPassword, header);
                    body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
                }
            }
        }
        if (body[0] < 0) {
            if (body[0] == -1) {
                ErrorPacket err = new ErrorPacket();
                err.fromBytes(body);
                throw new IOException("Error When doing Client Authentication:" + err.toString());
            } else {
                throw new IOException("Unexpected packet with field_count=" + body[0]);
            }
        }
    }