in driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java [182:337]
private void negotiate(SocketChannel channel) throws IOException {
// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol
HeaderPacket header = PacketManager.readHeader(channel, 4, timeout);
byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
if (body[0] < 0) {// check field_count
if (body[0] == -1) {
ErrorPacket error = new ErrorPacket();
error.fromBytes(body);
throw new IOException("handshake exception:\n" + error.toString());
} else if (body[0] == -2) {
throw new IOException("Unexpected EOF packet at handshake phase.");
} else {
throw new IOException("Unexpected packet with field_count=" + body[0]);
}
}
HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
handshakePacket.fromBytes(body);
// default utf8(33)
byte serverCharsetNumber = (handshakePacket.serverCharsetNumber != 0) ? handshakePacket.serverCharsetNumber : 33;
SslMode sslMode = sslInfo != null ? sslInfo.getSslMode() : SslMode.DISABLED;
if (sslMode != SslMode.DISABLED) {
boolean serverSupportSsl = (handshakePacket.serverCapabilities & CLIENT_SSL) > 0;
if (!serverSupportSsl) {
throw new IOException("MySQL Server does not support SSL: " + address + " serverCapabilities: "
+ handshakePacket.serverCapabilities);
}
byte[] sslPacket = new SslRequestCommandPacket(serverCharsetNumber).toBytes();
HeaderPacket sslHeader = new HeaderPacket();
sslHeader.setPacketBodyLength(sslPacket.length);
sslHeader.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
header.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
PacketManager.writePkg(channel, sslHeader.toBytes(), sslPacket);
channel = SocketChannelPool.connectSsl(channel, sslInfo);
this.channel = channel;
}
if (handshakePacket.protocolVersion != MSC.DEFAULT_PROTOCOL_VERSION) {
// HandshakeV9
auth323(channel, (byte) (header.getPacketSequenceNumber() + 1), handshakePacket.seed);
return;
}
connectionId = handshakePacket.threadId; // 记录一下connection
serverVersion = handshakePacket.serverVersion; // 记录serverVersion
logger.info("handshake initialization packet received, prepare the client authentication packet to send");
// 某些老协议的 server 默认不返回 auth plugin,需要使用默认的 mysql_native_password
String authPluginName = "mysql_native_password";
if (handshakePacket.authPluginName != null && handshakePacket.authPluginName.length > 0) {
authPluginName = new String(handshakePacket.authPluginName);
}
logger.info("auth plugin: {}", authPluginName);
boolean isSha2Password = false;
ClientAuthenticationPacket clientAuth;
if ("caching_sha2_password".equals(authPluginName)) {
clientAuth = new ClientAuthenticationSHA2Packet();
isSha2Password = true;
} else {
clientAuth = new ClientAuthenticationPacket();
}
clientAuth.setCharsetNumber(serverCharsetNumber);
clientAuth.setUsername(username);
clientAuth.setPassword(password);
clientAuth.setServerCapabilities(handshakePacket.serverCapabilities);
clientAuth.setDatabaseName(defaultSchema);
clientAuth.setScrumbleBuff(joinAndCreateScrumbleBuff(handshakePacket));
clientAuth.setAuthPluginName(authPluginName.getBytes());
byte[] clientAuthPkgBody = clientAuth.toBytes();
HeaderPacket h = new HeaderPacket();
h.setPacketBodyLength(clientAuthPkgBody.length);
h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
PacketManager.writePkg(channel, h.toBytes(), clientAuthPkgBody);
logger.info("client authentication packet is sent out.");
// check auth result
header = PacketManager.readHeader(channel, 4);
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
assert body != null;
byte marker = body[0];
if (marker == -2 || marker == 1) {
if (isSha2Password && body[1] == 3) {
// sha2 auth ok
logger.info("caching_sha2_password auth success.");
header = PacketManager.readHeader(channel, 4);
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
} else {
byte[] authData = null;
String pluginName = authPluginName;
if (marker == 1) {
AuthSwitchRequestMoreData packet = new AuthSwitchRequestMoreData();
packet.fromBytes(body);
authData = packet.authData;
} else {
AuthSwitchRequestPacket packet = new AuthSwitchRequestPacket();
packet.fromBytes(body);
authData = packet.authData;
pluginName = packet.authName;
logger.info("auth switch pluginName is {}.", pluginName);
}
byte[] encryptedPassword = null;
if ("mysql_clear_password".equals(pluginName)) {
encryptedPassword = getPassword().getBytes();
header = authSwitchAfterAuth(encryptedPassword, header);
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
} else if (pluginName == null || "mysql_native_password".equals(pluginName)) {
try {
encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
}
header = authSwitchAfterAuth(encryptedPassword, header);
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
} else if ("caching_sha2_password".equals(pluginName)) {
if (body[0] == 0x01 && body[1] == 0x04) {
// support full auth
// clientAuth提前采用了sha2编码,会减少一次auth交互
header = cachingSha2PasswordFullAuth(channel,
header,
getPassword().getBytes(),
clientAuth.getScrumbleBuff());
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
} else {
byte[] scramble = authData;
try {
encryptedPassword = MySQLPasswordEncrypter.scrambleCachingSha2(getPassword().getBytes(),
scramble);
} catch (DigestException e) {
throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
}
header = authSwitchAfterAuth(encryptedPassword, header);
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
assert body != null;
if (body[0] == 0x01 && body[1] == 0x04) {
// fixed issue https://github.com/alibaba/canal/pull/4767, support mysql 8.0.30+
header = cachingSha2PasswordFullAuth(channel, header, getPassword().getBytes(), scramble);
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
}
}
} else {
header = authSwitchAfterAuth(encryptedPassword, header);
body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
}
}
}
if (body[0] < 0) {
if (body[0] == -1) {
ErrorPacket err = new ErrorPacket();
err.fromBytes(body);
throw new IOException("Error When doing Client Authentication:" + err.toString());
} else {
throw new IOException("Unexpected packet with field_count=" + body[0]);
}
}
}