def _request_authentication()

in Code/AuroraSLDataAPIDemo/aslbootstrap/pymysql/connections.py [0:0]


    def _request_authentication(self):
        # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse
        if int(self.server_version.split('.', 1)[0]) >= 5:
            self.client_flag |= CLIENT.MULTI_RESULTS

        if self.user is None:
            raise ValueError("Did not specify a username")

        charset_id = charset_by_name(self.charset).id
        if isinstance(self.user, text_type):
            self.user = self.user.encode(self.encoding)

        data_init = struct.pack('<iIB23s', self.client_flag, MAX_PACKET_LEN, charset_id, b'')

        if self.ssl and self.server_capabilities & CLIENT.SSL:
            self.write_packet(data_init)

            self._sock = self.ctx.wrap_socket(self._sock, server_hostname=self.host)
            self._rfile = _makefile(self._sock, 'rb')
            self._secure = True

        data = data_init + self.user + b'\0'

        authresp = b''
        plugin_name = None

        if self._auth_plugin_name == '':
            plugin_name = b''
            authresp = _auth.scramble_native_password(self.password, self.salt)
        elif self._auth_plugin_name == 'mysql_native_password':
            plugin_name = b'mysql_native_password'
            authresp = _auth.scramble_native_password(self.password, self.salt)
        elif self._auth_plugin_name == 'caching_sha2_password':
            plugin_name = b'caching_sha2_password'
            if self.password:
                if DEBUG:
                    print("caching_sha2: trying fast path")
                authresp = _auth.scramble_caching_sha2(self.password, self.salt)
            else:
                if DEBUG:
                    print("caching_sha2: empty password")
        elif self._auth_plugin_name == 'sha256_password':
            plugin_name = b'sha256_password'
            if self.ssl and self.server_capabilities & CLIENT.SSL:
                authresp = self.password + b'\0'
            elif self.password:
                authresp = b'\1'  # request public key
            else:
                authresp = b'\0'  # empty password

        if self.server_capabilities & CLIENT.PLUGIN_AUTH_LENENC_CLIENT_DATA:
            data += lenenc_int(len(authresp)) + authresp
        elif self.server_capabilities & CLIENT.SECURE_CONNECTION:
            data += struct.pack('B', len(authresp)) + authresp
        else:  # pragma: no cover - not testing against servers without secure auth (>=5.0)
            data += authresp + b'\0'

        if self.db and self.server_capabilities & CLIENT.CONNECT_WITH_DB:
            if isinstance(self.db, text_type):
                self.db = self.db.encode(self.encoding)
            data += self.db + b'\0'

        if self.server_capabilities & CLIENT.PLUGIN_AUTH:
            data += (plugin_name or b'') + b'\0'

        if self.server_capabilities & CLIENT.CONNECT_ATTRS:
            connect_attrs = b''
            for k, v in self._connect_attrs.items():
                k = k.encode('utf-8')
                connect_attrs += struct.pack('B', len(k)) + k
                v = v.encode('utf-8')
                connect_attrs += struct.pack('B', len(v)) + v
            data += struct.pack('B', len(connect_attrs)) + connect_attrs

        self.write_packet(data)
        auth_packet = self._read_packet()

        # if authentication method isn't accepted the first byte
        # will have the octet 254
        if auth_packet.is_auth_switch_request():
            if DEBUG: print("received auth switch")
            # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchRequest
            auth_packet.read_uint8() # 0xfe packet identifier
            plugin_name = auth_packet.read_string()
            if self.server_capabilities & CLIENT.PLUGIN_AUTH and plugin_name is not None:
                auth_packet = self._process_auth(plugin_name, auth_packet)
            else:
                # send legacy handshake
                data = _auth.scramble_old_password(self.password, self.salt) + b'\0'
                self.write_packet(data)
                auth_packet = self._read_packet()
        elif auth_packet.is_extra_auth_data():
            if DEBUG:
                print("received extra data")
            # https://dev.mysql.com/doc/internals/en/successful-authentication.html
            if self._auth_plugin_name == "caching_sha2_password":
                auth_packet = _auth.caching_sha2_password_auth(self, auth_packet)
            elif self._auth_plugin_name == "sha256_password":
                auth_packet = _auth.sha256_password_auth(self, auth_packet)
            else:
                raise err.OperationalError("Received extra packet for auth method %r", self._auth_plugin_name)

        if DEBUG: print("Succeed to auth")