perfkitbenchmarker/sql_engine_utils.py (404 lines of code) (raw):

# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Utilities to support multiple engines.""" import abc import dataclasses import logging import time import timeit from typing import Any, Dict, List, Tuple, Union from absl import flags from perfkitbenchmarker import sample from perfkitbenchmarker import virtual_machine FLAGS = flags.FLAGS SECOND = 'seconds' FLEXIBLE_SERVER_MYSQL = 'flexible-server-mysql' FLEXIBLE_SERVER_POSTGRES = 'flexible-server-postgres' TIMESCALEDB = 'timescaledb' OMNI = 'omni' MYSQL = 'mysql' MARIADB = 'mariadb' POSTGRES = 'postgres' AURORA_POSTGRES = 'aurora-postgresql' AURORA_MYSQL = 'aurora-mysql' SQLSERVER = 'sqlserver' SQLSERVER_EXPRESS = 'sqlserver-ex' SQLSERVER_ENTERPRISE = 'sqlserver-ee' SQLSERVER_STANDARD = 'sqlserver-se' SPANNER_GOOGLESQL = 'spanner-googlesql' SPANNER_POSTGRES = 'spanner-postgres' ALLOYDB = 'alloydb-postgresql' ALL_ENGINES = [ TIMESCALEDB, OMNI, MARIADB, MYSQL, POSTGRES, AURORA_POSTGRES, AURORA_MYSQL, SQLSERVER, SQLSERVER_EXPRESS, SQLSERVER_ENTERPRISE, SQLSERVER_STANDARD, SPANNER_GOOGLESQL, SPANNER_POSTGRES, FLEXIBLE_SERVER_MYSQL, FLEXIBLE_SERVER_POSTGRES, ALLOYDB, ] ENGINE_TYPES = [MYSQL, POSTGRES, SQLSERVER] AWS_SQLSERVER_ENGINES = [ 'sqlserver-ee', 'sqlserver-se', 'sqlserver-ex', 'sqlserver-web', ] AWS_AURORA_POSTGRES_ENGINE = 'aurora-postgresql' AWS_AURORA_MYSQL_ENGINE = 'aurora-mysql' DEFAULT_COMMAND = 'default' SQLSERVER_AOAG_NAME = 'pkb-aoag' SQLSERVER_AOAG_DB_NAME = 'pkb-aoag-db' _PGADAPTER_MAX_SESSIONS = 5000 _PGADAPTER_CONNECT_WAIT_SEC = 60 # Query Related tools @dataclasses.dataclass class DbConnectionProperties: """Data class to store attrubutes needed for connecting to a database.""" engine: str engine_version: str endpoint: str port: int database_username: str database_password: str instance_name: str | None = None database_name: str | None = None project: str | None = None class ISQLQueryTools(metaclass=abc.ABCMeta): """Interface for SQL related query. Attributes: vm: VM to issue command with. connection_properties: Connection properties of the database. """ ENGINE_TYPE = None def __init__( self, vm: virtual_machine.VirtualMachine, connection_properties: DbConnectionProperties, ): """Initialize ISQLQuery class.""" self.vm = vm self.connection_properties = connection_properties def TimeQuery( self, database_name: str, query: str, is_explain: bool = False, suppress_stdout: bool = False, ) -> Tuple[Any, Any, str]: """Time a query..""" if is_explain: query = self.GetExplainPrefix() + query start = timeit.default_timer() stdout_, error_ = self.IssueSqlCommand( query, database_name=database_name, suppress_stdout=suppress_stdout, timeout=60 * 300, ) end = timeit.default_timer() run_time = str(end - start) if error_: logging.info('Quries finished with error %s', error_) run_time = '-1' return stdout_, error_, run_time def SamplesFromQueriesWithExplain( self, database_name: str, queries: Dict[str, str], metadata: Dict[str, Any], ) -> List[sample.Sample]: """Helper function to run quries.""" results = [] for query in queries: execution_plan, _, run_time = self.TimeQuery( database_name, queries[query], is_explain=True ) logging.info('Execution Plan for Query %s: %s', query, execution_plan) result = sample.Sample('Query %s' % query, run_time, SECOND, metadata) results.append(result) return results def SamplesFromQueriesAfterRunningExplain( self, database_name: str, queries: Dict[str, str], metadata: Dict[str, Any], ) -> List[sample.Sample]: """Run queryset once to prewarm, then run the queryset again for timing.""" results = [] for query in queries: execution_plan, _, _ = self.TimeQuery( database_name, queries[query], is_explain=True ) logging.info('Execution Plan for Query %s: %s', query, execution_plan) for query in queries: _, _, run_time = self.TimeQuery( database_name, queries[query], is_explain=False, suppress_stdout=True ) result = sample.Sample('Query %s' % query, run_time, SECOND, metadata) results.append(result) return results def IssueSqlCommand( self, command: Union[str, Dict[str, str]], database_name: str = '', superuser: bool = False, session_variables: str = '', timeout: int | None = None, ignore_failure: bool = False, suppress_stdout: bool = False, ): """Issue Sql Command.""" if self.ENGINE_TYPE is None: raise ValueError('ENGINE_TYPE is None') command_string = None # Get the command to issue base on type if isinstance(command, dict): if self.ENGINE_TYPE in command: command_string = command[self.ENGINE_TYPE] else: command_string = command[DEFAULT_COMMAND] elif isinstance(command, str): command_string = command command_string = self.MakeSqlCommand( command_string, database_name=database_name, session_variables=session_variables, ) if superuser: command_string = 'sudo ' + command_string if suppress_stdout: command_string = command_string + ' >/dev/null 2>&1' return self.vm.RemoteCommand( command_string, timeout=timeout, ignore_failure=ignore_failure ) @abc.abstractmethod def InstallPackages(self) -> None: """Installs packages required for making queries.""" pass @abc.abstractmethod def GetConnectionString(self, **kwargs): """Get connection string.""" pass @abc.abstractmethod def MakeSqlCommand( self, command: str, database_name: str = '', session_variables: str = '' ): """Make a sql command.""" pass def GetExplainPrefix(self) -> str: """Returns the prefix for explain query.""" return 'EXPLIAN ' def RunSqlScript( self, file_path: str, database_name: str = '' ) -> Tuple[str, str]: """Run a sql command through a file. The file could have multiple commands. RunSqlScript runs the sql file from the file_path using the database_name. Failure in one command might cause failure in subsequent commands. Args: file_path: The local path from the machine. database_name: Name of the database. Uses the master database or the default database if nothing is supplied. Returns: A tuple of standard output and standard error. """ raise NotImplementedError('Running from a file is not currently supported.') def CreateDatabase(self, database_name: str) -> tuple[str, str]: """Creates the specified database.""" raise NotImplementedError() def DeleteDatabase(self, database_name: str) -> tuple[str, str]: """Deletes the specified database.""" raise NotImplementedError() class PostgresCliQueryTools(ISQLQueryTools): """SQL Query class to issue postgres related query.""" ENGINE_TYPE = POSTGRES # The default database in postgres DEFAULT_DATABASE = POSTGRES def InstallPackages(self): """Installs packages required for making queries.""" self.vm.Install('postgres_client') def MakeSqlCommand( self, command: str, database_name: str = '', session_variables: str = '' ): """Make Sql Command.""" if not database_name: database_name = self.DEFAULT_DATABASE sql_command = 'psql %s ' % self.GetConnectionString(database_name) if session_variables: for session_variable in session_variables: sql_command += '-c "%s" ' % session_variable sql_command += '-c "%s"' % command return sql_command def GetConnectionString(self, database_name='', endpoint=''): if not database_name: database_name = self.DEFAULT_DATABASE if not endpoint: endpoint = self.connection_properties.endpoint return "'host={} user={} password={} dbname={}'".format( endpoint, self.connection_properties.database_username, self.connection_properties.database_password, database_name, ) def GetDSNConnectionString(self, database_name=''): if not database_name: database_name = self.DEFAULT_DATABASE return ( f'postgresql://{self.connection_properties.database_username}:' + f'{self.connection_properties.database_password}@' + f'{self.connection_properties.endpoint}:5432/{database_name}' ) def GetSysbenchConnectionString(self): return ( '--pgsql-host={} --pgsql-user={} --pgsql-password="{}" ' '--pgsql-port=5432' ).format( self.connection_properties.endpoint, self.connection_properties.database_username, self.connection_properties.database_password, ) def GetExplainPrefix(self) -> str: """Adding hints to increase the verboseness of the explain.""" return 'EXPLAIN (ANALYZE, BUFFERS, TIMING, SUMMARY, VERBOSE) ' def CreateDatabase(self, database_name: str) -> tuple[str, str]: """See base class.""" return self.IssueSqlCommand(f'create database {database_name}') def DeleteDatabase(self, database_name: str) -> tuple[str, str]: """See base class.""" return self.IssueSqlCommand(f'drop database {database_name}') class SpannerPostgresCliQueryTools(PostgresCliQueryTools): """SQL Query class to issue Spanner postgres queries (subset of postgres).""" ENGINE_TYPE = SPANNER_POSTGRES # The default database in postgres DEFAULT_DATABASE = POSTGRES def Connect( self, sessions: int | None = None, database_name: str = '' ) -> None: """Connects to the DB using PGAdapter. See https://cloud.google.com/spanner/docs/sessions for a description of how session count affects performance. Args: sessions: The number of Spanner minSessions to set for the client. database_name: Database to connect """ self.vm.RemoteCommand('fuser -k 5432/tcp', ignore_failure=True) # Connections need some time to cleanup, or the run command fails. time.sleep(_PGADAPTER_CONNECT_WAIT_SEC) sessions_arg = '' if sessions: sessions_arg = ( f'-r "minSessions={sessions};' f'maxSessions={_PGADAPTER_MAX_SESSIONS};' f'numChannels={int(_PGADAPTER_MAX_SESSIONS/100)}"' ) properties = self.connection_properties database_name = database_name or properties.database_name command = ( 'java -jar pgadapter.jar ' '-dir /tmp ' f'-p {properties.project} ' f'-i {properties.instance_name} ' f'-d {database_name} ' f'{sessions_arg} ' '&> /dev/null &' ) self.vm.RemoteCommand(command) # Connections need some time to startup, or the run command fails. time.sleep(_PGADAPTER_CONNECT_WAIT_SEC) def InstallPackages(self) -> None: """Installs packages required for making queries.""" self.vm.Install('pgadapter') self.Connect() self.vm.Install('postgres_client') def MakeSqlCommand( self, command: str, database_name: str = '', session_variables: str = '' ) -> str: """Makes Sql Command.""" sql_command = 'psql %s ' % self.GetConnectionString() if session_variables: for session_variable in session_variables: sql_command += '-c "%s" ' % session_variable sql_command += '-c "%s"' % command return sql_command def GetConnectionString(self, database_name: str = '', endpoint='') -> str: return '-h localhost' def GetSysbenchConnectionString(self) -> str: return '--pgsql-host=/tmp' class MysqlCliQueryTools(ISQLQueryTools): """SQL Query class to issue Mysql related query.""" ENGINE_TYPE = MYSQL def InstallPackages(self): """Installs packages required for making queries.""" if ( self.connection_properties.engine_version == '5.6' or self.connection_properties.engine_version.startswith('5.6.') ): mysql_name = 'mysqlclient56' elif ( self.connection_properties.engine_version == '5.7' or self.connection_properties.engine_version.startswith('5.7') or self.connection_properties.engine_version == '8.0' or self.connection_properties.engine_version.startswith('8.0') ): mysql_name = 'mysqlclient' else: raise ValueError( 'Invalid database engine version: %s. Only 5.6, 5.7 ' 'and 8.0 are supported.' % self.connection_properties.engine_version ) self.vm.Install(mysql_name) def MakeSqlCommand( self, command: str, database_name: str = '', session_variables: str = '' ): """See base class.""" if session_variables: raise NotImplementedError( 'Session variables is currently not supported in mysql' ) mysql_command = 'mysql %s ' % (self.GetConnectionString()) if database_name: mysql_command += database_name + ' ' return mysql_command + '-e "%s"' % command def GetConnectionString(self, endpoint=''): if not endpoint: endpoint = self.connection_properties.endpoint return '-h {} -P 3306 -u {} -p{}'.format( self.connection_properties.endpoint, self.connection_properties.database_username, self.connection_properties.database_password, ) def GetSysbenchConnectionString(self): return ('--mysql-host={} --mysql-user={} --mysql-password="{}" ').format( self.connection_properties.endpoint, self.connection_properties.database_username, self.connection_properties.database_password, ) def CreateDatabase(self, database_name: str) -> tuple[str, str]: """See base class.""" return self.IssueSqlCommand(f'create database {database_name}') def DeleteDatabase(self, database_name: str) -> tuple[str, str]: """See base class.""" return self.IssueSqlCommand(f'drop database {database_name}') class SqlServerCliQueryTools(ISQLQueryTools): """SQL Query class to issue SQL server related query.""" ENGINE_TYPE = SQLSERVER def InstallPackages(self): """Installs packages required for making queries.""" self.vm.Install('mssql_tools') def MakeSqlCommand( self, command: str, database_name: str = '', session_variables: str = '' ): """See base class.""" if session_variables: raise NotImplementedError( 'Session variables is currently not supported in mysql' ) sqlserver_command = 'sqlcmd -C -S %s -U %s -P %s ' % ( self.connection_properties.endpoint, self.connection_properties.database_username, self.connection_properties.database_password, ) if database_name: sqlserver_command += '-d %s ' % database_name sqlserver_command = sqlserver_command + '-Q "%s"' % command return sqlserver_command def GetConnectionString(self, database_name='', endpoint=''): raise NotImplementedError('Connection string currently not supported') def RunSqlScript( self, file_path: str, database_name: str = '' ) -> Tuple[str, str]: """Runs Sql script from sqlcmd. This method execute command in a sql file using sqlcmd with the -i option enabled. Args: file_path: The local path from the machine. database_name: Name of the database. Returns: A tuple of stdout and stderr from running the command. """ sqlserver_command = '/opt/mssql-tools/bin/sqlcmd -C -S %s -U %s -P %s ' % ( self.connection_properties.endpoint, self.connection_properties.database_username, self.connection_properties.database_password, ) if database_name: sqlserver_command += '-d %s ' % database_name sqlserver_command += ' -i ' + file_path return self.vm.RemoteCommand(sqlserver_command) # Helper functions for this module def GetDbEngineType(db_engine: str) -> str: """Converts the engine type from db_engine. The same engine type can have multiple names because of differences in cloud provider or versions. Args: db_engine: db_engine defined in the spec Returns: Engine type in string. """ # AWS uses sqlserver-se and sqlserver-ex as db_egine for sql server if db_engine in AWS_SQLSERVER_ENGINES: return SQLSERVER elif ( db_engine == AWS_AURORA_POSTGRES_ENGINE or db_engine == FLEXIBLE_SERVER_POSTGRES ): return POSTGRES elif ( db_engine == AWS_AURORA_MYSQL_ENGINE or db_engine == FLEXIBLE_SERVER_MYSQL ): return MYSQL elif db_engine == ALLOYDB or db_engine == OMNI or db_engine == TIMESCALEDB: return POSTGRES elif db_engine == SPANNER_POSTGRES: return SPANNER_POSTGRES elif db_engine == SPANNER_GOOGLESQL: return SPANNER_GOOGLESQL elif db_engine == MARIADB: return MYSQL if db_engine not in ENGINE_TYPES: raise TypeError('Unsupported engine type', db_engine) return db_engine def GetQueryToolsByEngine(vm, connection_properties): """Returns the query tools to use for the engine.""" engine_type = GetDbEngineType(connection_properties.engine) if engine_type == MYSQL: return MysqlCliQueryTools(vm, connection_properties) elif engine_type == MARIADB: return MysqlCliQueryTools(vm, connection_properties) elif engine_type == POSTGRES: return PostgresCliQueryTools(vm, connection_properties) elif engine_type == SQLSERVER: return SqlServerCliQueryTools(vm, connection_properties) elif engine_type == SPANNER_POSTGRES: return SpannerPostgresCliQueryTools(vm, connection_properties) raise ValueError('Engine not supported')