in Code/AuroraSLDataAPIDemo/aslbootstrap/pymysql/connections.py [0:0]
def _request_authentication(self):
# https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse
if int(self.server_version.split('.', 1)[0]) >= 5:
self.client_flag |= CLIENT.MULTI_RESULTS
if self.user is None:
raise ValueError("Did not specify a username")
charset_id = charset_by_name(self.charset).id
if isinstance(self.user, text_type):
self.user = self.user.encode(self.encoding)
data_init = struct.pack('<iIB23s', self.client_flag, MAX_PACKET_LEN, charset_id, b'')
if self.ssl and self.server_capabilities & CLIENT.SSL:
self.write_packet(data_init)
self._sock = self.ctx.wrap_socket(self._sock, server_hostname=self.host)
self._rfile = _makefile(self._sock, 'rb')
self._secure = True
data = data_init + self.user + b'\0'
authresp = b''
plugin_name = None
if self._auth_plugin_name == '':
plugin_name = b''
authresp = _auth.scramble_native_password(self.password, self.salt)
elif self._auth_plugin_name == 'mysql_native_password':
plugin_name = b'mysql_native_password'
authresp = _auth.scramble_native_password(self.password, self.salt)
elif self._auth_plugin_name == 'caching_sha2_password':
plugin_name = b'caching_sha2_password'
if self.password:
if DEBUG:
print("caching_sha2: trying fast path")
authresp = _auth.scramble_caching_sha2(self.password, self.salt)
else:
if DEBUG:
print("caching_sha2: empty password")
elif self._auth_plugin_name == 'sha256_password':
plugin_name = b'sha256_password'
if self.ssl and self.server_capabilities & CLIENT.SSL:
authresp = self.password + b'\0'
elif self.password:
authresp = b'\1' # request public key
else:
authresp = b'\0' # empty password
if self.server_capabilities & CLIENT.PLUGIN_AUTH_LENENC_CLIENT_DATA:
data += lenenc_int(len(authresp)) + authresp
elif self.server_capabilities & CLIENT.SECURE_CONNECTION:
data += struct.pack('B', len(authresp)) + authresp
else: # pragma: no cover - not testing against servers without secure auth (>=5.0)
data += authresp + b'\0'
if self.db and self.server_capabilities & CLIENT.CONNECT_WITH_DB:
if isinstance(self.db, text_type):
self.db = self.db.encode(self.encoding)
data += self.db + b'\0'
if self.server_capabilities & CLIENT.PLUGIN_AUTH:
data += (plugin_name or b'') + b'\0'
if self.server_capabilities & CLIENT.CONNECT_ATTRS:
connect_attrs = b''
for k, v in self._connect_attrs.items():
k = k.encode('utf-8')
connect_attrs += struct.pack('B', len(k)) + k
v = v.encode('utf-8')
connect_attrs += struct.pack('B', len(v)) + v
data += struct.pack('B', len(connect_attrs)) + connect_attrs
self.write_packet(data)
auth_packet = self._read_packet()
# if authentication method isn't accepted the first byte
# will have the octet 254
if auth_packet.is_auth_switch_request():
if DEBUG: print("received auth switch")
# https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchRequest
auth_packet.read_uint8() # 0xfe packet identifier
plugin_name = auth_packet.read_string()
if self.server_capabilities & CLIENT.PLUGIN_AUTH and plugin_name is not None:
auth_packet = self._process_auth(plugin_name, auth_packet)
else:
# send legacy handshake
data = _auth.scramble_old_password(self.password, self.salt) + b'\0'
self.write_packet(data)
auth_packet = self._read_packet()
elif auth_packet.is_extra_auth_data():
if DEBUG:
print("received extra data")
# https://dev.mysql.com/doc/internals/en/successful-authentication.html
if self._auth_plugin_name == "caching_sha2_password":
auth_packet = _auth.caching_sha2_password_auth(self, auth_packet)
elif self._auth_plugin_name == "sha256_password":
auth_packet = _auth.sha256_password_auth(self, auth_packet)
else:
raise err.OperationalError("Received extra packet for auth method %r", self._auth_plugin_name)
if DEBUG: print("Succeed to auth")