google_cloud_automlops/orchestration/base.py (153 lines of code) (raw):

# Copyright 2023 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Creates generic component, pipeline, and services objects.""" # pylint: disable=anomalous-backslash-in-string # pylint: disable=C0103 # pylint: disable=line-too-long # pylint: disable=broad-exception-caught import ast import inspect from typing import Callable, List, Optional, TypeVar, Union import docstring_parser from google_cloud_automlops.utils.utils import ( get_function_source_definition, read_yaml_file ) from google_cloud_automlops.utils.constants import ( BASE_DIR, DEFAULT_PIPELINE_NAME, GENERATED_DEFAULTS_FILE ) T = TypeVar('T') class BaseComponent(): """The Component object represents a component defined by the user. """ def __init__(self, func: Optional[Callable] = None, packages_to_install: Optional[List[str]] = None): """Initiates a generic Component object created out of a function holding all necessary code. Args: func (Optional[Callable]): The python function to create a component from. The function should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file). Defaults to None. packages_to_install (Optional[List[str]]): A list of optional packages to install before executing func. These will always be installed at component runtime. Defaults to None. Raises: ValueError: The parameter `func` is not an existing function. """ # Confirm the input is an existing function if not inspect.isfunction(func): raise ValueError(f'{func} must be of type function.') # Set simple attributes of the component function self.func = func self.name = func.__name__ self.packages_to_install = [] if not packages_to_install else packages_to_install # Parse the docstring for description self.parsed_docstring = docstring_parser.parse(inspect.getdoc(func)) self.description = self.parsed_docstring.short_description # Process and extract details from passed function self.parameters = self._get_function_parameters() self.return_types = self._get_function_return_types() self.src_code = get_function_source_definition(self.func) # Instantiate attributes to be set during build self.artifact_repo_location = None self.artifact_repo_name = None self.project_id = None self.naming_prefix = None def build(self): """Instantiates an abstract built method to create and write task files. Also reads in defaults file to save default arguments to attributes. Raises: NotImplementedError: The subclass has not defined the `build` method. """ defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) self.artifact_repo_location = defaults['gcp']['artifact_repo_location'] self.artifact_repo_name = defaults['gcp']['artifact_repo_name'] self.project_id = defaults['gcp']['project_id'] self.naming_prefix = defaults['gcp']['naming_prefix'] raise NotImplementedError('Subclass needs to define this.') def _get_function_return_types(self) -> list: """Returns a formatted list of function return types. Returns: list: return value list with types converted to kubeflow spec. Raises: Exception: If return type is provided and not a NamedTuple. """ # Extract return type annotation of function annotation = inspect.signature(self.func).return_annotation # Ensures return type is not optional if self.maybe_strip_optional_from_annotation(annotation) is not annotation: raise TypeError('Return type cannot be Optional.') # No annotations provided, return none # pylint: disable=protected-access if annotation == inspect._empty: return None # Checks if the function's return type annotation is a valid NamedTuple if not (hasattr(annotation,'__annotations__') and isinstance(annotation.__annotations__, dict)): raise TypeError(f'''Return type hint for function "{self.name}" must be a NamedTuple.''') # Creates a dictionary of metadata for each object returned by component outputs = [] for name, type_ in annotation.__annotations__.items(): metadata = {} metadata['name'] = name metadata['type'] = type_ metadata['description'] = None outputs.append(metadata) return outputs def _get_function_parameters(self) -> list: """Returns a formatted list of parameters. Returns: list: Params list with types converted to kubeflow spec. Raises: Exception: Parameter type hints are not provided. """ # Extract function parameter names and their descriptions from the function's docstring signature = inspect.signature(self.func) parameters = list(signature.parameters.values()) parsed_docstring = docstring_parser.parse(inspect.getdoc(self.func)) doc_dict = {p.arg_name: p.description for p in parsed_docstring.params} # Extract parameter metadata parameter_holder = [] for param in parameters: metadata = {} metadata['name'] = param.name metadata['description'] = doc_dict.get(param.name) metadata['type'] = self.maybe_strip_optional_from_annotation( param.annotation) parameter_holder.append(metadata) # pylint: disable=protected-access if metadata['type'] == inspect._empty: raise TypeError( f'''Missing type hint for parameter "{metadata['name']}". ''' f'''Please specify the type for this parameter.''') return parameter_holder def maybe_strip_optional_from_annotation(self, annotation: T) -> T: """Strips 'Optional' from 'Optional[<type>]' if applicable. For example:: Optional[str] -> str str -> str List[int] -> List[int] Args: annotation: The original type annotation which may or may not has `Optional`. Returns: The type inside Optional[] if Optional exists, otherwise the original type. """ if getattr(annotation, '__origin__', None) is Union and annotation.__args__[1] is type(None): return annotation.__args__[0] else: return annotation class BasePipeline(): """The Pipeline object represents a component defined by the user. """ def __init__(self, func: Optional[Callable] = None, name: Optional[str] = None, description: Optional[str] = None, comps_dict: dict = None): """Initiates a pipeline object created out of a function holding all necessary code. Args: func (Optional[Callable]): The python function to create a pipeline from. The function should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file). Defaults to None. name (Optional[str]): The name of the pipeline. Defaults to None. description (Optional[str]): Short description of what the pipeline does. Defaults to None. comps_list (dict): Dictionary of potential components for pipeline to utilize imported as the global held in AutoMLOps.py. Defaults to None. """ # Instantiate and set key pipeline attributes self.func = func self.func_name = func.__name__ self.name = DEFAULT_PIPELINE_NAME if not name else name self.description = description self.src_code = get_function_source_definition(self.func) self.comps = self.get_pipeline_components(func, comps_dict) # Instantiate attributes to be set at build process self.base_image = None self.custom_training_job_specs = None self.pipeline_params = None self.pubsub_topic_name = None self.use_ci = None self.project_id = None self.gs_pipeline_job_spec_path = None self.setup_model_monitoring = None def build(self, pipeline_params: dict, custom_training_job_specs: Optional[List] = None): """Instantiates an abstract built method to create and write pipeline files. Also reads in defaults file to save default arguments to attributes. Files created must include: 1. README.md 2. Dockerfile 3. Requirements.txt Args: custom_training_job_specs (dict): Specifies the specs to run the training job with. pipeline_params (Optional[List]): Dictionary containing runtime pipeline parameters. Defaults to None. Raises: NotImplementedError: The subclass has not defined the `build` method. """ # Save parameters as attributes self.custom_training_job_specs = custom_training_job_specs self.pipeline_params = pipeline_params # Extract additional attributes from defaults file defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) self.project_id = defaults['gcp']['project_id'] self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'] self.base_image = defaults['gcp']['base_image'] self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] self.use_ci = defaults['tooling']['use_ci'] self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] raise NotImplementedError('Subclass needs to define this.') def get_pipeline_components(self, pipeline_func: Callable, comps_dict: dict) -> list: """Returns a list of components used within a given pipeline. Args: pipeline_func (Callable): Pipeline function. comps_dict (dict): List of potential components to use within pipeline. Returns: List: Components from comps_dict used within the pipeline_func. """ # Retrieves pipeline source code and parses it into an Abstract Syntax Tree (AST) code = inspect.getsource(pipeline_func) ast_tree = ast.parse(code) # Iterates through AST, finds function calls to components that are in comps_dict comps_list = [] for node in ast.walk(ast_tree): try: if isinstance(node, ast.Call) and node.func.id in comps_dict.keys(): comps_list.append(comps_dict[node.func.id]) except Exception: pass return comps_list class BaseFuturePipeline(): """Placeholder for future pipeline object that will be created out of a list of components. """ def __init__(self, comps: list) -> None: self.comps = comps self.names = [comp.name for comp in self.comps] class BaseServices(): """The Services object will contain code within the services/ dir. """ def __init__(self) -> None: """Instantiates a generic Services object. """ self.pipeline_storage_path = None self.pipeline_job_runner_service_account = None self.pipeline_job_submission_service_type = None self.project_id = None self.pipeline_job_submission_service_type = None self.setup_model_monitoring = None # Set directory for files to be written to self.submission_service_base_dir = BASE_DIR + 'services/submission_service' def build(self): """Constructs and writes files related to submission services and model monitoring. Files created under AutoMLOps/: services/ submission_service/ Dockerfile main.py requirements.txt model_monitoring/ (if requested) monitor.py requirements.txt """ # Extract additional attributes from defaults file defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) self.pipeline_storage_path = defaults['pipelines']['pipeline_storage_path'] self.pipeline_job_location = defaults['gcp']['pipeline_job_location'] self.pipeline_job_runner_service_account = defaults['gcp']['pipeline_job_runner_service_account'] self.pipeline_job_submission_service_type = defaults['gcp']['pipeline_job_submission_service_type'] self.project_id = defaults['gcp']['project_id'] self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] # Set directory for files to be written to self.submission_service_base_dir = BASE_DIR + 'services/submission_service' # Build services files self._build_submission_services() # Setup model monitoring if self.setup_model_monitoring: self._build_monitoring() def _build_monitoring(self): """Abstract method to create the model monitoring files. Raises: NotImplementedError: The subclass has not defined the `_build_monitoring` method. """ raise NotImplementedError('Subclass needs to define this') def _build_submission_services(self): """Abstract method to create the Dockerfile, requirements.txt, and main.py files of the services/submission_service directory. Raises: NotImplementedError: The subclass has not defined the `_build_submission_services` method. """ raise NotImplementedError('Subclass needs to define this.')