perfkitbenchmarker/relational_db.py (430 lines of code) (raw):

# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Relational DB class. This is the base implementation of all relational db. """ import abc from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import errors from perfkitbenchmarker import os_types from perfkitbenchmarker import resource from perfkitbenchmarker import sql_engine_utils # TODO(chunla): Move IAAS flag to file flags.DEFINE_string( 'db_engine', None, 'Managed database flavor to use (mysql, postgres)' ) flags.DEFINE_string( 'db_engine_version', None, 'Version of the database flavor selected, e.g. 5.7', ) flags.DEFINE_string( 'database_name', None, 'Name of the database to create. Defaults to pkb-db-[run-uri]', ) flags.DEFINE_string( 'database_username', None, 'Database username. Defaults to pkb-db-user-[run-uri]', ) flags.DEFINE_string( 'database_password', None, 'Database password. Defaults to a random 10-character alpha-numeric string', ) flags.DEFINE_boolean( 'db_high_availability', False, 'Specifies if the database should be high availability', ) flags.DEFINE_string( 'db_high_availability_type', None, 'Specifies high availableity type. (AOAG, FCIS2D, FCIMW)', ) flags.DEFINE_boolean( 'db_backup_enabled', True, 'Whether or not to enable automated backups' ) flags.DEFINE_list( 'db_zone', None, 'zone or region to launch the database in. ' "Defaults to the client vm's zone.", ) flags.DEFINE_list( 'db_replica_zones', [], 'zones to launch the database replicas in. ', ) flags.DEFINE_string( 'client_vm_zone', None, 'zone or region to launch the client in. ' ) flags.DEFINE_integer( 'client_vm_count', None, 'Number of client vms to provision ' ) flags.DEFINE_string('db_machine_type', None, 'Machine type of the database.') flags.DEFINE_integer('db_cpus', None, 'Number of Cpus in the database.') flags.DEFINE_string( 'db_memory', None, 'Amount of Memory in the database. Uses the same format ' 'string as custom machine memory type.', ) flags.DEFINE_integer('db_disk_size', None, 'Size of the database disk in GB.') flags.DEFINE_integer( 'db_num_striped_disks', None, 'The number of data disks to stripe together to form one.', ) flags.DEFINE_string('db_disk_type', None, 'Disk type of the database.') flags.DEFINE_integer( 'db_disk_iops', None, 'Disk IOPs to provision for database disks, if provisioning is applicable' ' or required. IOPs applies to each disk.', ) flags.DEFINE_integer( 'db_disk_throughput', None, 'Disk throughput to provision for database disks, if provisioning is' 'applicable or required. Throughput applies to each disk.', ) flags.DEFINE_integer( 'managed_db_azure_compute_units', None, 'Number of Dtus in the database.' ) flags.DEFINE_string( 'managed_db_tier', None, 'Tier in azure. (Basic, Standard, Premium).' ) flags.DEFINE_string('server_vm_os_type', None, 'OS type of the client vm.') flags.DEFINE_string('client_vm_os_type', None, 'OS type of the client vm.') flags.DEFINE_string( 'server_gcp_min_cpu_platform', None, 'Cpu platform of the server vm.' ) flags.DEFINE_string( 'client_gcp_min_cpu_platform', None, 'CPU platform of the client vm.' ) flags.DEFINE_string( 'client_vm_machine_type', None, 'Machine type of the client vm.' ) flags.DEFINE_integer('client_vm_cpus', None, 'Number of Cpus in the client vm.') flags.DEFINE_string( 'client_vm_memory', None, 'Amount of Memory in the vm. Uses the same format ' 'string as custom machine memory type.', ) flags.DEFINE_integer( 'client_vm_disk_size', None, 'Size of the client vm disk in GB.' ) flags.DEFINE_string('client_vm_disk_type', None, 'Disk type of the client vm.') flags.DEFINE_integer( 'client_vm_disk_iops', None, 'Disk iops of the database on AWS for client vm.', ) flags.DEFINE_boolean( 'use_managed_db', True, 'If true, uses the managed MySql ' 'service for the requested cloud provider. If false, uses ' 'MySql installed on a VM.', ) flags.DEFINE_list( 'db_flags', '', 'Flags to apply to the managed relational database ' "on the cloud that's being used. Example: " 'binlog_cache_size=4096,innodb_log_buffer_size=4294967295', ) flags.DEFINE_integer( 'innodb_buffer_pool_size', None, 'Size of the innodb buffer pool size in GB. ' 'If unset, innodb_buffer_pool_ratio is used.', ) flags.DEFINE_float( 'innodb_buffer_pool_ratio', 0.25, 'Ratio of the innodb buffer pool size to VM memory. ' 'Ignored if innodb_buffer_pool_size is set.', lower_bound=0, upper_bound=1, ) flags.DEFINE_bool( 'mysql_bin_log', False, 'Flag to turn binary logging on. Defaults to False' ) flags.DEFINE_integer( 'innodb_log_file_size', 1000, 'Size of the log file in MB. Defaults to 1000M.', ) flags.DEFINE_integer( 'postgres_shared_buffer_size', None, 'Size of the shared buffer size in GB. ' 'If unset, postgres_shared_buffer_ratio is used.', ) flags.DEFINE_float( 'postgres_shared_buffer_ratio', 0.25, 'Ratio of the shared buffer size to VM memory. ' 'Ignored if postgres_shared_buffer_size is set.', lower_bound=0, upper_bound=1, ) OPTIMIZE_DB_SYSCTL_CONFIG = flags.DEFINE_bool( 'optimize_db_sysctl_config', True, 'Flag to run sysctl optimization for IAAS relational database.', ) SERVER_GCE_NUM_LOCAL_SSDS = flags.DEFINE_integer( 'server_gce_num_local_ssds', 0, 'The number of ssds that should be added to the Server. Note ' 'that the flag only applies for vms that can have a variable ' 'number of local SSDs.', ) SERVER_GCE_SSD_INTERFACE = flags.DEFINE_enum( 'server_gce_ssd_interface', 'SCSI', ['SCSI', 'NVME'], 'The ssd interface for GCE local SSD.', ) ENABLE_DATA_CACHE = flags.DEFINE_bool( 'gcp_db_enable_data_cache', False, 'Whether to enable data cache.' ) FLAGS = flags.FLAGS DEFAULT_MYSQL_PORT = 3306 DEFAULT_POSTGRES_PORT = 5432 DEFAULT_SQLSERVER_PORT = 1433 DEFAULT_PORTS = { sql_engine_utils.MYSQL: DEFAULT_MYSQL_PORT, sql_engine_utils.POSTGRES: DEFAULT_POSTGRES_PORT, sql_engine_utils.SQLSERVER: DEFAULT_SQLSERVER_PORT, } class RelationalDbPropertyNotSetError(Exception): pass class RelationalDbEngineNotFoundError(Exception): pass class UnsupportedError(Exception): pass def GetRelationalDbClass(cloud, is_managed_db, engine): """Get the RelationalDb class corresponding to 'cloud'. Args: cloud: name of cloud to get the class for is_managed_db: is the database self managed or database a a service engine: database engine type Returns: BaseRelationalDb class with the cloud attribute of 'cloud'. """ try: return resource.GetResourceClass( BaseRelationalDb, CLOUD=cloud, IS_MANAGED=is_managed_db, ENGINE=engine ) except errors.Resource.SubclassNotFoundError: pass return resource.GetResourceClass( BaseRelationalDb, CLOUD=cloud, IS_MANAGED=is_managed_db ) def VmsToBoot(vm_groups): # TODO(user): Enable replications. return { name: spec # pylint: disable=g-complex-comprehension for name, spec in vm_groups.items() if name == 'clients' or name == 'default' or name == 'controller' or (not FLAGS.use_managed_db and name == 'servers') or (not FLAGS.use_managed_db and name == 'servers_replicas') } class BaseRelationalDb(resource.BaseResource): """Object representing a relational database Service.""" IS_MANAGED = True RESOURCE_TYPE = 'BaseRelationalDb' REQUIRED_ATTRS = ['CLOUD', 'IS_MANAGED'] def __init__(self, relational_db_spec): """Initialize the managed relational database object. Args: relational_db_spec: spec of the managed database. """ super().__init__( enable_freeze_restore=relational_db_spec.enable_freeze_restore, create_on_restore_error=relational_db_spec.create_on_restore_error, delete_on_freeze_error=relational_db_spec.delete_on_freeze_error, ) self.spec = relational_db_spec self.engine = self.spec.engine self.engine_type = sql_engine_utils.GetDbEngineType(self.engine) self.instance_id = 'pkb-db-instance-' + FLAGS.run_uri self.port = self.GetDefaultPort() self.is_managed_db = self.IS_MANAGED self.endpoint = '' self.replica_endpoint = '' self.client_vms = [] @property def client_vm(self): """Client VM which will drive the database test. This is required by subclasses to perform client-vm network-specific tasks, such as getting information about the VPC, IP address, etc. Raises: RelationalDbPropertyNotSetError: if the client_vm is missing. Returns: The client_vm. """ if not hasattr(self, '_client_vm'): raise RelationalDbPropertyNotSetError('client_vm is not set') return self._client_vm # TODO(user): add support for multiple client VMs @client_vm.setter def client_vm(self, client_vm): self._client_vm = client_vm def _GetDbConnectionProperties( self, ) -> sql_engine_utils.DbConnectionProperties: return sql_engine_utils.DbConnectionProperties( self.spec.engine, self.spec.engine_version, self.endpoint, self.port, self.spec.database_username, self.spec.database_password, ) # TODO(user): Deprecate in favor of client_vms_query_tools @property def client_vm_query_tools(self): return self.client_vms_query_tools[0] @property def client_vms_query_tools(self) -> list[sql_engine_utils.ISQLQueryTools]: connection_properties = self._GetDbConnectionProperties() tools = [ sql_engine_utils.GetQueryToolsByEngine(vm, connection_properties) for vm in self.client_vms ] return [t for t in tools if t is not None] @property def client_vm_query_tools_for_replica(self): """Query tools to make custom queries on replica.""" if not self.replica_endpoint: raise ValueError('Database does not have replica.') connection_properties = sql_engine_utils.DbConnectionProperties( self.spec.engine, self.spec.engine_version, self.replica_endpoint, self.port, self.spec.database_username, self.spec.database_password, ) return sql_engine_utils.GetQueryToolsByEngine( self.client_vm, connection_properties ) def SetVms(self, vm_groups): self.client_vms = vm_groups[ 'clients' if 'clients' in vm_groups else 'default' ] # TODO(user): Remove this after moving to multiple client VMs. self.client_vm = self.client_vms[0] @property def port(self): """Port (int) on which the database server is listening.""" if not hasattr(self, '_port'): raise RelationalDbPropertyNotSetError('port not set') return self._port @port.setter def port(self, port): self._port = int(port) def GetResourceMetadata(self): """Returns a dictionary of metadata. Child classes can extend this if needed. Raises: RelationalDbPropertyNotSetError: if any expected metadata is missing. """ metadata = { 'zone': self.spec.db_spec.zone, 'disk_type': self.spec.db_disk_spec.disk_type, 'disk_size': self.spec.db_disk_spec.disk_size, 'engine': self.spec.engine, 'high_availability': self.spec.high_availability, 'backup_enabled': self.spec.backup_enabled, 'engine_version': self.spec.engine_version, 'client_vm_zone': self.spec.vm_groups['clients'].vm_spec.zone, 'use_managed_db': self.is_managed_db, 'instance_id': self.instance_id, 'client_vm_disk_type': self.spec.vm_groups[ 'clients' ].disk_spec.disk_type, 'client_vm_disk_size': self.spec.vm_groups[ 'clients' ].disk_spec.disk_size, } if ( hasattr(self.spec.db_spec, 'machine_type') and self.spec.db_spec.machine_type ): metadata.update({ 'machine_type': self.spec.db_spec.machine_type, }) elif hasattr(self.spec.db_spec, 'cpus') and ( hasattr(self.spec.db_spec, 'memory') ): metadata.update({ 'cpus': self.spec.db_spec.cpus, }) metadata.update({ 'memory': self.spec.db_spec.memory, }) elif hasattr(self.spec.db_spec, 'tier') and ( hasattr(self.spec.db_spec, 'compute_units') ): metadata.update({ 'tier': self.spec.db_spec.tier, }) metadata.update({ 'compute_units': self.spec.db_spec.compute_units, }) else: raise RelationalDbPropertyNotSetError( 'Machine type of the database must be set.' ) if ( hasattr(self.spec.vm_groups['clients'].vm_spec, 'machine_type') and self.spec.vm_groups['clients'].vm_spec.machine_type ): metadata.update({ 'client_vm_machine_type': self.spec.vm_groups[ 'clients' ].vm_spec.machine_type, }) elif hasattr(self.spec.vm_groups['clients'].vm_spec, 'cpus') and ( hasattr(self.spec.vm_groups['clients'].vm_spec, 'memory') ): metadata.update({ 'client_vm_cpus': self.spec.vm_groups['clients'].vm_spec.cpus, }) metadata.update({ 'client_vm_memory': self.spec.vm_groups['clients'].vm_spec.memory, }) else: raise RelationalDbPropertyNotSetError( 'Machine type of the client VM must be set.' ) if FLAGS.db_flags: metadata.update({ 'db_flags': FLAGS.db_flags, }) if hasattr(self.spec, 'db_tier'): metadata.update({ 'db_tier': self.spec.db_tier, }) return metadata @abc.abstractmethod def GetDefaultEngineVersion(self, engine): """Return the default version (for PKB) for the given database engine. Args: engine: name of the database engine Returns: default version as a string for the given engine. """ def _SetEndpoint(self): """Set the DB endpoint for this instance during _PostCreate.""" pass def _PostCreate(self): if self.spec.db_flags: self._ApplyDbFlags() self._SetEndpoint() background_tasks.RunThreaded( lambda client_query_tools: client_query_tools.InstallPackages(), self.client_vms_query_tools, ) # Add a traceroute command to the client VM to ensure that the database is # accessible. This also informs the baseline latency of the network. if self.client_vm.OS_TYPE in os_types.LINUX_OS_TYPES and self.endpoint: self.client_vm.RemoteCommand( 'sudo apt-get install -y tcptraceroute', ignore_failure=True ) self.client_vm.RemoteCommand( f'tcptraceroute {self.endpoint} {self.port}', ignore_failure=True ) def UpdateCapacityForLoad(self) -> None: """Updates infrastructure to the correct capacity for loading.""" pass def UpdateCapacityForRun(self) -> None: """Updates infrastructure to the correct capacity for running.""" pass def _ApplyDbFlags(self): """Apply Flags on the database.""" raise NotImplementedError( 'Managed Db flags is not supported for %s' % type(self).__name__ ) def GetDefaultPort(self): """Returns default port for the db engine from the spec.""" engine = sql_engine_utils.GetDbEngineType(self.spec.engine) if engine not in DEFAULT_PORTS: raise NotImplementedError( 'Default port not specified for engine {}'.format(engine) ) return DEFAULT_PORTS[engine] def CreateDatabase(self, database_name: str) -> tuple[str, str]: """Creates the database.""" return self.client_vms_query_tools[0].CreateDatabase(database_name) def DeleteDatabase(self, database_name: str) -> tuple[str, str]: """Deletes the database.""" return self.client_vms_query_tools[0].DeleteDatabase(database_name) def Failover(self): """Fail over the database. Throws exception if not high available.""" if not self.spec.high_availability: raise RuntimeError( "Attempt to fail over a database that isn't marked as high available" ) self._FailoverHA() def _FailoverHA(self): """Fail over from master to replica.""" raise NotImplementedError('Failover is not implemented.') def RestartDatabase(self): """Restarts all the database services in the benchmark. Args: None Returns: none """ raise NotImplementedError('Restart database is not implemented.')