google_cloud_automlops/orchestration/kfp.py (308 lines of code) (raw):
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Creates a KFP component, pipeline, and services subclass."""
# pylint: disable=anomalous-backslash-in-string
# pylint: disable=C0103
# pylint: disable=line-too-long
import json
import textwrap
from typing import Callable, List, Optional
try:
from importlib.resources import files as import_files
except ImportError:
# Try backported to PY<37 `importlib_resources`
from importlib_resources import files as import_files
from kfp.dsl import component
from kfp import compiler
from google_cloud_automlops.orchestration.base import BaseComponent, BasePipeline, BaseServices
from google_cloud_automlops.utils.utils import (
execute_process,
make_dirs,
read_file,
read_yaml_file,
render_jinja,
write_and_chmod,
write_file,
write_yaml_file
)
from google_cloud_automlops.utils.constants import (
BASE_DIR,
GENERATED_BUILD_COMPONENTS_SH_FILE,
GENERATED_COMPONENT_BASE,
GENERATED_DEFAULTS_FILE,
GENERATED_LICENSE,
GENERATED_MODEL_MONITORING_MONITOR_PY_FILE,
GENERATED_MODEL_MONITORING_REQUIREMENTS_FILE,
GENERATED_MODEL_MONITORING_SH_FILE,
GENERATED_PARAMETER_VALUES_PATH,
GENERATED_PIPELINE_FILE,
GENERATED_PIPELINE_REQUIREMENTS_FILE,
GENERATED_PIPELINE_RUNNER_FILE,
GENERATED_PIPELINE_SPEC_SH_FILE,
GENERATED_PUBLISH_TO_TOPIC_FILE,
GENERATED_RUN_PIPELINE_SH_FILE,
GENERATED_RUN_ALL_SH_FILE,
KFP_TEMPLATES_PATH,
PINNED_KFP_VERSION,
)
class KFPComponent(BaseComponent):
"""Creates a KFP specific Component object for Kubeflow Pipelines.
Args:
BaseComponent (object): Generic Component object.
"""
def build(self):
"""Constructs files for running and managing Kubeflow pipelines.
"""
defaults = read_yaml_file(GENERATED_DEFAULTS_FILE)
self.artifact_repo_location = defaults['gcp']['artifact_repo_location']
self.artifact_repo_name = defaults['gcp']['artifact_repo_name']
self.project_id = defaults['gcp']['project_id']
self.naming_prefix = defaults['gcp']['naming_prefix']
# Set and create directory for components if it does not already exist
component_dir = BASE_DIR + 'components/' + self.name
comp_yaml_path = component_dir + '/component.yaml'
# Build necessary folders
# TODO: make this only happen for the first component or pull into automlops.py
make_dirs([
component_dir,
BASE_DIR + 'components/component_base/src/'])
compspec_image = (
f'''{self.artifact_repo_location}-docker.pkg.dev/'''
f'''{self.project_id}/'''
f'''{self.artifact_repo_name}/'''
f'''{self.naming_prefix}/'''
f'''components/component_base:latest''')
# Write component spec
custom_component = component(func=self.func, base_image=compspec_image)
compiler.Compiler().compile(custom_component, comp_yaml_path)
# Write task script to component base
write_file(
filepath=BASE_DIR + 'components/component_base/src/' + self.name + '.py',
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base.src') / 'task.py.j2',
generated_license=GENERATED_LICENSE,
custom_code_contents=self.src_code),
mode='w')
component_spec = read_yaml_file(comp_yaml_path)
# Update component_spec to include correct startup command
component_spec['deploymentSpec']['executors'][f'''exec-{self.name.replace('_', '-')}''']['container']['command'] = [
'python3',
f'''/pipelines/component/src/{self.name + '.py'}''']
# Write license and overwrite component spec to the appropriate component.yaml file
write_file(
filepath=comp_yaml_path,
text=GENERATED_LICENSE,
mode='w')
write_yaml_file(
filepath=comp_yaml_path,
contents=component_spec,
mode='a')
class KFPPipeline(BasePipeline):
"""Creates a KFP specific Pipeline object for Kubeflow Pipelines.
Args:
BasePipeline (object): Generic Pipeline object.
"""
def __init__(self,
func: Optional[Callable] = None,
name: Optional[str] = None,
description: Optional[str] = None,
comps_dict: dict = None) -> None:
"""Initiates a KFP pipeline object created out of a function holding all necessary code.
Args:
func (Optional[Callable]): The python function to create a pipeline from. The functio
should have type annotations for all its arguments, indicating how it is intended
to be used (e.g. as an input/output Artifact object, a plain parameter, or a path
to a file). Defaults to None.
name (Optional[str]): The name of the pipeline. Defaults to None.
description (Optional[str]): Short description of what the pipeline does. Defaults to None.
comps_list (dict): Dictionary of potential components for pipeline to utilize imported
as the global held in AutoMLOps.py. Defaults to None.
"""
super().__init__(
func=func,
name=name,
description=description,
comps_dict=comps_dict)
# Create pipeline scaffold attribute, which is an empty pipelines template
# without the DAG definition
self.pipeline_scaffold = (
self._get_pipeline_decorator()
+ self.src_code
+ self._get_compile_step())
def build(self,
pipeline_params: dict,
custom_training_job_specs: Optional[List] = None):
"""Constructs files for running and managing Kubeflow pipelines.
Files created under AutoMLOps/:
README.md
scripts/
pipeline_spec/.gitkeep
build_components.sh
build_pipeline_spec.sh
run_pipeline.sh
publish_to_topic.sh
run_all.sh
components/
component_base/Dockerfile
component_base/requirements.txt
pipelines/
pipeline.py
pipeline_runner.py
requirements.txt
runtime_parameters/pipeline_parameter_values.json
Args:
custom_training_job_specs (dict): Specifies the specs to run the training job with.
pipeline_params (Optional[List]): Dictionary containing runtime pipeline parameters. Defaults
to None.
"""
# Save parameters as attributes
self.custom_training_job_specs = custom_training_job_specs
self.pipeline_params = pipeline_params
# Extract additional attributes from defaults file
defaults = read_yaml_file(GENERATED_DEFAULTS_FILE)
self.project_id = defaults['gcp']['project_id']
self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path']
self.base_image = defaults['gcp']['base_image']
self.use_ci = defaults['tooling']['use_ci']
self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] if self.use_ci else None
self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring']
# Build necessary folders
make_dirs([
f'{BASE_DIR}scripts/pipeline_spec/',
f'{BASE_DIR}pipelines',
f'{BASE_DIR}pipelines/runtime_parameters/'
])
# README.md: Write description of the contents of the directory
write_file(
filepath=f'{BASE_DIR}README.md',
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH) / 'README.md.j2',
setup_model_monitoring=self.setup_model_monitoring,
use_ci=self.use_ci),
mode='w')
# components/component_base/dockerfile: Write the component base Dockerfile
write_file(
filepath=f'{GENERATED_COMPONENT_BASE}/Dockerfile',
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base') / 'Dockerfile.j2',
base_image=self.base_image,
generated_license=GENERATED_LICENSE),
mode='w')
# components/component_base/requirements.txt: Write the component base requirements file
write_file(
filepath=f'{GENERATED_COMPONENT_BASE}/requirements.txt',
text=self._create_component_base_requirements(),
mode='w')
# Save scripts template path
scripts_template_path = import_files(KFP_TEMPLATES_PATH + '.scripts')
# scripts/pipeline_spec/.gitkeep: Write gitkeep to pipeline_spec directory
write_file(
filepath=f'{BASE_DIR}scripts/pipeline_spec/.gitkeep',
text='',
mode='w')
# scripts/build_components.sh: Write script for building components
write_and_chmod(
filepath=GENERATED_BUILD_COMPONENTS_SH_FILE,
text=render_jinja(
template_path=scripts_template_path / 'build_components.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR))
# scripts/build_pipeline_spec.sh: Write script for building pipeline specs
write_and_chmod(
filepath=GENERATED_PIPELINE_SPEC_SH_FILE,
text=render_jinja(
template_path=scripts_template_path / 'build_pipeline_spec.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR))
# scripts/run_pipline: Write script for running pipeline
write_and_chmod(
filepath=GENERATED_RUN_PIPELINE_SH_FILE,
text=render_jinja(
template_path=scripts_template_path / 'run_pipeline.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR))
# scripts/run_all.sh: Write script for running all files
write_and_chmod(
filepath=GENERATED_RUN_ALL_SH_FILE,
text=render_jinja(
template_path=scripts_template_path / 'run_all.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR))
# scripts/publish_to_topic.sh: If using CI, write script for publishing to pubsub topic
if self.use_ci:
write_and_chmod(
filepath=GENERATED_PUBLISH_TO_TOPIC_FILE,
text=render_jinja(
template_path=scripts_template_path / 'publish_to_topic.sh.j2',
base_dir=BASE_DIR,
generated_license=GENERATED_LICENSE,
generated_parameter_values_path=GENERATED_PARAMETER_VALUES_PATH,
pubsub_topic_name=self.pubsub_topic_name))
# pipelines/pipeline.py: Generates a Kubeflow pipeline spec from custom components.
components_list = self._get_component_list()
pipeline_scaffold_contents = textwrap.indent(self.pipeline_scaffold, 4 * ' ')
write_file(
filepath=GENERATED_PIPELINE_FILE,
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'pipeline.py.j2',
components_list=components_list,
custom_training_job_specs=self.custom_training_job_specs,
generated_license=GENERATED_LICENSE,
pipeline_scaffold_contents=pipeline_scaffold_contents,
project_id=self.project_id),
mode='w')
# pipelines/pipeline_runner.py: Sends a PipelineJob to Vertex AI using pipeline spec.
write_file(
filepath=GENERATED_PIPELINE_RUNNER_FILE,
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'pipeline_runner.py.j2',
generated_license=GENERATED_LICENSE),
mode='w')
# pipelines/requirements.txt
write_file(
filepath=GENERATED_PIPELINE_REQUIREMENTS_FILE,
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'requirements.txt.j2',
pinned_kfp_version=PINNED_KFP_VERSION),
mode='w')
# pipelines/runtime_parameters/pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob.
self.pipeline_params['gs_pipeline_spec_path'] = self.gs_pipeline_job_spec_path
serialized_params = json.dumps(self.pipeline_params, indent=4)
write_file(BASE_DIR + GENERATED_PARAMETER_VALUES_PATH, serialized_params, 'w')
def _get_pipeline_decorator(self):
"""Constructs the kfp pipeline decorator.
Returns:
str: KFP pipeline decorator.
"""
name_str = f'''(\n name='{self.name}',\n'''
desc_str = f''' description='{self.description}',\n''' if self.description else ''
ending_str = ')\n'
return '@dsl.pipeline' + name_str + desc_str + ending_str
def _get_compile_step(self) -> str:
"""Constructs the compile function call.
Returns:
str: Compile function call.
"""
return (
f'\n'
f'compiler.Compiler().compile(\n'
f' pipeline_func={self.func_name},\n'
f' package_path=pipeline_job_spec_path)\n'
f'\n'
)
def _get_component_list(self) -> str:
"""Gets a list of all the component names in a pipeline.
Returns:
str: List of all component names.
"""
return [comp.name for comp in self.comps]
def _create_component_base_requirements(self) -> str:
"""Creates the contents of a requirements.txt to the component_base directory.
Optionally infer pip requirements from the python srcfiles using pipreqs.
Takes user-inputted requirements, and addes some default gcp packages as well as
packages that are often missing in setup.py files (e.g db_types, pyarrow,
gcsfs, fsspec).
Returns:
str: contents of the requirements.txt for the pipeline job
"""
reqs_filename = f'{GENERATED_COMPONENT_BASE}/requirements.txt'
default_gcp_reqs = [
'google-cloud-aiplatform',
'google-cloud-appengine-logging',
'google-cloud-audit-log',
'google-cloud-bigquery',
'google-cloud-bigquery-storage',
'google-cloud-bigtable',
'google-cloud-core',
'google-cloud-dataproc',
'google-cloud-datastore',
'google-cloud-dlp',
'google-cloud-firestore',
'google-cloud-kms',
'google-cloud-language',
'google-cloud-logging',
'google-cloud-monitoring',
'google-cloud-notebooks',
'google-cloud-pipeline-components',
'google-cloud-pubsub',
'google-cloud-pubsublite',
'google-cloud-recommendations-ai',
'google-cloud-resource-manager',
'google-cloud-scheduler',
'google-cloud-spanner',
'google-cloud-speech',
'google-cloud-storage',
'google-cloud-tasks',
'google-cloud-translate',
'google-cloud-videointelligence',
'google-cloud-vision',
'db_dtypes',
'pyarrow',
'gcsfs',
'fsspec']
# Get user-inputted requirements from the cache dir
user_inp_reqs = []
for comp in self.comps:
user_inp_reqs.extend(comp.packages_to_install)
# Check if user inputted requirements
if user_inp_reqs:
# Remove duplicates
set_of_requirements = set(user_inp_reqs)
else:
# If user did not input requirements, then infer reqs using pipreqs
execute_process(f'python3 -m pipreqs.pipreqs {GENERATED_COMPONENT_BASE} --mode no-pin --force', to_null=True)
pipreqs = read_file(reqs_filename).splitlines()
set_of_requirements = set(pipreqs + default_gcp_reqs)
# Remove empty string
if '' in set_of_requirements:
set_of_requirements.remove('')
# Pin kfp version
if 'kfp' in set_of_requirements:
set_of_requirements.remove('kfp')
set_of_requirements.add(PINNED_KFP_VERSION)
# Stringify and sort
reqs_str = ''.join(r+'\n' for r in sorted(set_of_requirements))
return reqs_str
class KFPServices(BaseServices):
"""Creates a KFP specific Services object for kubeflow pipelines.
Args:
BaseServices (object): Generic Services object.
"""
def _build_monitoring(self):
"""Writes files necessary for implementing model monitoring. Files created are:
scripts/
create_model_monitoring_job.sh
model_monitoring/
monitor.py
requirements.txt
"""
# Writes script create_model_monitoring_job.sh which creates a Vertex AI model monitoring job
write_and_chmod(
filepath=GENERATED_MODEL_MONITORING_SH_FILE,
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.scripts') / 'create_model_monitoring_job.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR
))
# Writes monitor.py to create or update a model monitoring job in Vertex AI for a deployed model endpoint
write_file(
filepath=GENERATED_MODEL_MONITORING_MONITOR_PY_FILE,
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.model_monitoring') / 'monitor.py.j2',
generated_license=GENERATED_LICENSE
),
mode='w')
# Writes a requirements.txt to the model_monitoring directory
write_file(
filepath=GENERATED_MODEL_MONITORING_REQUIREMENTS_FILE,
text=render_jinja(template_path=import_files(KFP_TEMPLATES_PATH + '.model_monitoring') / 'requirements.txt.j2'),
mode='w')
def _build_submission_services(self):
"""Writes the files necessary for utilizing submission services. Files written are:
services/
submission_service/
Dockerfile
main.py
requirements.txt
"""
write_file(
f'{self.submission_service_base_dir}/requirements.txt',
render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'requirements.txt.j2',
pinned_kfp_version=PINNED_KFP_VERSION,
pipeline_job_submission_service_type=self.pipeline_job_submission_service_type),
'w')
write_file(
f'{self.submission_service_base_dir}/main.py',
render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'main.py.j2',
generated_license=GENERATED_LICENSE,
pipeline_root=self.pipeline_storage_path,
pipeline_job_location=self.pipeline_job_location,
pipeline_job_runner_service_account=self.pipeline_job_runner_service_account,
pipeline_job_submission_service_type=self.pipeline_job_submission_service_type,
project_id=self.project_id,
setup_model_monitoring=self.setup_model_monitoring),
'w')
write_file(
f'{self.submission_service_base_dir}/Dockerfile',
render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'Dockerfile.j2',
base_dir=BASE_DIR,
generated_license=GENERATED_LICENSE),
'w')