perfkitbenchmarker/resource.py (258 lines of code) (raw):

# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing abstract class for reliable resources. The Resource class wraps unreliable create and delete commands in retry loops and checks for resource existence so that resources can be created and deleted reliably. """ import abc import logging import time from typing import Any, List, TypeVar from absl import flags from perfkitbenchmarker import errors from perfkitbenchmarker import sample from perfkitbenchmarker import vm_util from perfkitbenchmarker.configs import auto_registry FLAGS = flags.FLAGS _RESOURCE_REGISTRY = {} RegisteredType = TypeVar('RegisteredType') ResourceType = type[RegisteredType] # Constants used for annotating resources with timeout metadata: # GCP labels only allow hyphens (-), underscores (_), lowercase characters, and # numbers and International characters. # metadata allow all characters and numbers. METADATA_TIME_FORMAT = '%Y%m%dt%H%M%Sz' TIMEOUT_METADATA_KEY = 'timeout_utc' def GetResourceClass(base_class: ResourceType, **kwargs) -> ResourceType: """Returns the subclass with the corresponding attributes. Args: base_class: The base class of the resource to return (e.g. BaseVirtualMachine). **kwargs: Every attribute/value of the subclass's REQUIRED_ATTRS that were used to register the subclass. Raises: Exception: If no class could be found with matching attributes. """ return auto_registry.GetRegisteredClass( _RESOURCE_REGISTRY, base_class, None, **kwargs ) class AutoRegisterResourceMeta(abc.ABCMeta): """Metaclass which allows resources to automatically be registered.""" # See BaseResource RESOURCE_TYPE: str REQUIRED_ATTRS: List[str] def __init__(cls, name, bases, dct): auto_registry.RegisterClass( _RESOURCE_REGISTRY, cls, cls.REQUIRED_ATTRS, cls.RESOURCE_TYPE ) super().__init__(name, bases, dct) @classmethod def GetAttributes(mcs) -> list[tuple[Any, ...]]: """Override to manually set the attributes for registering the class.""" return [] class BaseResource(metaclass=AutoRegisterResourceMeta): """An object representing a cloud resource. Attributes: created: True if the resource has been created. deleted: True if the resource has been deleted. user_managed: Whether Create() and Delete() should be skipped. restored: True if the resource has been restored. enable_freeze_restore: Whether the resource should use freeze/restore when the option is specified on the command line. Different benchmarks may want different resources to have freeze/restore enabled. create_on_restore_error: Whether to create the resource if there is an issue while restoring. delete_on_freeze_error: Whether to delete the resource if there is an issue while freezing. create_start_time: The start time of the last create. delete_start_time: The start time of the last delete. create_end_time: The end time of the last create. delete_end_time: The end time of the last delete. resource_ready_time: The time when the resource last became ready. metadata: Dictionary of resource metadata. """ # The name of the base class (e.g. BaseVirtualMachine) that will be extended # with auto-registered subclasses. RESOURCE_TYPE = None # A list of attributes that are used to register Resource subclasses # (e.g. CLOUD). REQUIRED_ATTRS = ['CLOUD'] # Timeout in seconds for resource to be ready. READY_TIMEOUT = None # Time between retries. POLL_INTERVAL = 5 def __init__( self, user_managed=False, enable_freeze_restore=False, create_on_restore_error=False, delete_on_freeze_error=False, ): super().__init__() # Class level attributes does not persist after pickle # Copy required attributes to the object for attribute in self.REQUIRED_ATTRS: setattr(self, attribute, getattr(self, attribute, None)) self.created = user_managed self.deleted = user_managed self.user_managed = user_managed self.restored: bool = False self.enable_freeze_restore = enable_freeze_restore self.create_on_restore_error = create_on_restore_error self.delete_on_freeze_error = delete_on_freeze_error # Creation and deletion time information # that we may make use of later. self.create_start_time: float = None self.delete_start_time = None self.create_end_time: float = None self.delete_end_time = None self.resource_ready_time: float = None self.metadata = dict() def GetResourceMetadata(self): """Returns a dictionary of metadata about the resource.""" return self.metadata.copy() @abc.abstractmethod def _Create(self): """Creates the underlying resource.""" raise NotImplementedError() def _Restore(self) -> None: """Restores the underlying resource from a file. This method is required if using Restore() with a resource. """ raise NotImplementedError() def _Freeze(self) -> None: """Freezes the underlying resource to a long-term, sustainable state. This method is required if using Restore() with a resource. """ raise NotImplementedError() def _UpdateTimeout(self, timeout_minutes: int) -> None: """Updates the underlying resource's timeout after a successful freeze. This method is required if using Freeze()/Restore() with a resource. Args: timeout_minutes: The number of minutes past the current time at which the resource should be considered expired. """ raise NotImplementedError() @abc.abstractmethod def _Delete(self): """Deletes the underlying resource. Implementations of this method should be idempotent since it may be called multiple times, even if the resource has already been deleted. """ raise NotImplementedError() def _Exists(self): """Returns true if the underlying resource exists. Supplying this method is optional. If it is not implemented then the default is to assume success when _Create and _Delete do not raise exceptions. """ raise NotImplementedError() def _WaitUntilRunning(self): """Waits until the resource is (or was) running. Supplying this method is optional. Use it when a resource is created using an asynchronous create command and its status is verified as running via repeatedly polling the resource with 'describe' commands. """ pass def _IsReady(self): """Return true if the underlying resource is ready. Supplying this method is optional. Use it when a resource can exist without being ready. If the subclass does not implement it then it just returns true. Returns: True if the resource was ready in time, False if the wait timed out. """ return True def _IsDeleting(self): """Return true if the underlying resource is getting deleted. Supplying this method is optional. Potentially use when the resource has an asynchronous deletion operation to avoid rerunning the deletion command and track the deletion time correctly. If the subclass does not implement it then it just returns false. Returns: True if the resource was being deleted, False if the resource was in a non deleting state. """ return False def _PreDelete(self): """Method that will be called once before _DeleteResource() is called. Supplying this method is optional. If it is supplied, it will be called once, before attempting to delete the resource. It is intended to allow data about the resource to be collected right before it is deleted. """ pass def _PostCreate(self): """Method that will be called once after _CreateResource() is called. Supplying this method is optional. If it is supplied, it will be called once, after the resource is confirmed to exist. It is intended to allow data about the resource to be collected or for the resource to be tagged. """ pass def _CreateDependencies(self): """Method that will be called once before _CreateResource() is called. Supplying this method is optional. It is intended to allow additional flexibility in creating resource dependencies separately from _Create(). """ pass def _DeleteDependencies(self): """Method that will be called once after _DeleteResource() is called. Supplying this method is optional. It is intended to allow additional flexibility in deleting resource dependencies separately from _Delete(). """ pass @vm_util.Retry(retryable_exceptions=(errors.Resource.RetryableCreationError,)) def _CreateResource(self): """Reliably creates the underlying resource.""" if self.created: return # Overwrite create_start_time each time this is called, # with the assumption that multple calls to Create() imply # that the resource was not actually being created on the # backend during previous failed attempts. self.create_start_time = time.time() self._Create() try: if not self._Exists(): raise errors.Resource.RetryableCreationError( 'Creation of %s failed.' % type(self).__name__ ) except NotImplementedError: pass self._WaitUntilRunning() self.created = True self.create_end_time = time.time() @vm_util.Retry( retryable_exceptions=(errors.Resource.RetryableDeletionError,), timeout=3600, ) def _DeleteResource(self): """Reliably deletes the underlying resource.""" # Retryable method which allows waiting for deletion of the resource. @vm_util.Retry( poll_interval=self.POLL_INTERVAL, fuzz=0, timeout=3600, retryable_exceptions=(errors.Resource.RetryableDeletionError,), ) def WaitUntilDeleted(): if self._IsDeleting(): raise errors.Resource.RetryableDeletionError('Not yet deleted') if self.deleted or not self.created: return if not self.delete_start_time: self.delete_start_time = time.time() self._Delete() WaitUntilDeleted() try: if self._Exists(): raise errors.Resource.RetryableDeletionError( 'Deletion of %s failed.' % type(self).__name__ ) except NotImplementedError: pass def Restore(self) -> None: """Restores a resource instead of creating it. Raises: RestoreError: Generic error encompassing restore failures. """ # TODO(user): Add usage lock with labels to prevent multiple # benchmarks from using the same resource concurrently. logging.info('Restoring resource %s.', repr(self)) try: self._Restore() except NotImplementedError as e: raise errors.Resource.RestoreError( f'Class {self.__class__} does not have _Restore() implemented but a ' 'restore file was provided.' ) from e except Exception as e: raise errors.Resource.RestoreError( f'Error restoring resource {repr(self)}' ) from e self.restored = True self.UpdateTimeout(FLAGS.timeout_minutes) def Create(self, restore: bool = False) -> None: """Creates a resource and its dependencies. Args: restore: Whether to restore the resource instead of creating. If enable_freeze_restore is false, this proceeds with creation. Raises: RestoreError: If there is an error while restoring. """ @vm_util.Retry( poll_interval=self.POLL_INTERVAL, fuzz=0, timeout=self.READY_TIMEOUT, retryable_exceptions=(errors.Resource.RetryableCreationError,), ) def WaitUntilReady(): if not self._IsReady(): raise errors.Resource.RetryableCreationError('Not yet ready') if self.user_managed: return if restore and self.enable_freeze_restore: try: self.Restore() return except errors.Resource.RestoreError: logging.exception( 'Encountered an exception while attempting to Restore(). ' 'Creating: %s', self.create_on_restore_error, ) if not self.create_on_restore_error: raise self._CreateDependencies() self._CreateResource() WaitUntilReady() if not self.resource_ready_time: self.resource_ready_time = time.time() self._PostCreate() def Freeze(self) -> None: """Freezes a resource instead of deleting it. Raises: FreezeError: Generic error encompassing freeze failures. """ logging.info('Freezing resource %s.', repr(self)) # Attempt to call freeze, failing if unimplemented. try: self._Freeze() except NotImplementedError as e: raise errors.Resource.FreezeError( f'Class {self.__class__} does not have _Freeze() implemented but ' 'Freeze() was called.' ) from e except Exception as e: raise errors.Resource.FreezeError( f'Error freezing resource {repr(self)}' ) from e # If frozen successfully, attempt to update the timeout. self.restored = False self.UpdateTimeout(FLAGS.persistent_timeout_minutes) def Delete(self, freeze: bool = False) -> None: """Deletes a resource and its dependencies. Args: freeze: Whether to freeze the resource instead of deleting. If enable_freeze_restore is false, this proceeds with deletion. Raises: FreezeError: If there is an error while freezing. """ if self.user_managed: return # Some resources (specifically VMs) lazily compute their metadata rather # than computing it after provisioning and stashing in their metadata dict # or static fields as they are supposed to. # Asking for metadata before deleting it should cache it and make it # available after we tear down resources, which is necessary for attaching # metadata in benchmark_spec.GetSamples() self.GetResourceMetadata() if freeze and self.enable_freeze_restore: try: self.Freeze() return except errors.Resource.FreezeError: logging.exception( 'Encountered an exception while attempting to Freeze(). ' 'Deleting: %s', self.delete_on_freeze_error, ) if not self.delete_on_freeze_error: raise self._PreDelete() self._DeleteResource() self.deleted = True self.delete_end_time = time.time() self._DeleteDependencies() def UpdateTimeout(self, timeout_minutes: int) -> None: """Updates the timeout of the underlying resource. Args: timeout_minutes: The number of minutes past the current time at which the resource should be considered expired. Raises: NotImplementedError: If the resource has not implemented _UpdateTimeout(). """ logging.info('Updating timeout for %s.', repr(self)) try: self._UpdateTimeout(timeout_minutes) except NotImplementedError: logging.exception( 'Class %s does not have _UpdateTimeout() implemented, which is ' 'needed for Freeze(). Please add an implementation.', self.__class__, ) raise def GetSamples(self) -> List[sample.Sample]: """Get samples relating to the provisioning of the resource.""" # This should not be necessary. Resources are responsible to wire their # GetResourceMetadata into publisher.py, but some do not. metadata = self.GetResourceMetadata() metadata['resource_type'] = self.RESOURCE_TYPE metadata['resource_class'] = self.__class__.__name__ samples = [] if self.create_start_time and self.create_end_time: samples.append( sample.Sample( 'Time to Create', self.create_end_time - self.create_start_time, 'seconds', metadata, ) ) if self.create_start_time and self.resource_ready_time: samples.append( sample.Sample( 'Time to Ready', self.resource_ready_time - self.create_start_time, 'seconds', metadata, ) ) if self.delete_start_time and self.delete_end_time: samples.append( sample.Sample( 'Time to Delete', self.delete_end_time - self.delete_start_time, 'seconds', metadata, ) ) return samples def CheckPrerequisites(self) -> None: """Checks preconditions for the resource. Requires resource to be checked in benchmark_spec.CheckPrerequisites() Allows for per-provider validation not available in benchmark.CheckPrerequisites(config). Raises: ValueError: If there is a validation issue. """ pass