in google_cloud_automlops/orchestration/kfp.py [0:0]
def build(self,
pipeline_params: dict,
custom_training_job_specs: Optional[List] = None):
"""Constructs files for running and managing Kubeflow pipelines.
Files created under AutoMLOps/:
README.md
scripts/
pipeline_spec/.gitkeep
build_components.sh
build_pipeline_spec.sh
run_pipeline.sh
publish_to_topic.sh
run_all.sh
components/
component_base/Dockerfile
component_base/requirements.txt
pipelines/
pipeline.py
pipeline_runner.py
requirements.txt
runtime_parameters/pipeline_parameter_values.json
Args:
custom_training_job_specs (dict): Specifies the specs to run the training job with.
pipeline_params (Optional[List]): Dictionary containing runtime pipeline parameters. Defaults
to None.
"""
# Save parameters as attributes
self.custom_training_job_specs = custom_training_job_specs
self.pipeline_params = pipeline_params
# Extract additional attributes from defaults file
defaults = read_yaml_file(GENERATED_DEFAULTS_FILE)
self.project_id = defaults['gcp']['project_id']
self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path']
self.base_image = defaults['gcp']['base_image']
self.use_ci = defaults['tooling']['use_ci']
self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] if self.use_ci else None
self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring']
# Build necessary folders
make_dirs([
f'{BASE_DIR}scripts/pipeline_spec/',
f'{BASE_DIR}pipelines',
f'{BASE_DIR}pipelines/runtime_parameters/'
])
# README.md: Write description of the contents of the directory
write_file(
filepath=f'{BASE_DIR}README.md',
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH) / 'README.md.j2',
setup_model_monitoring=self.setup_model_monitoring,
use_ci=self.use_ci),
mode='w')
# components/component_base/dockerfile: Write the component base Dockerfile
write_file(
filepath=f'{GENERATED_COMPONENT_BASE}/Dockerfile',
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base') / 'Dockerfile.j2',
base_image=self.base_image,
generated_license=GENERATED_LICENSE),
mode='w')
# components/component_base/requirements.txt: Write the component base requirements file
write_file(
filepath=f'{GENERATED_COMPONENT_BASE}/requirements.txt',
text=self._create_component_base_requirements(),
mode='w')
# Save scripts template path
scripts_template_path = import_files(KFP_TEMPLATES_PATH + '.scripts')
# scripts/pipeline_spec/.gitkeep: Write gitkeep to pipeline_spec directory
write_file(
filepath=f'{BASE_DIR}scripts/pipeline_spec/.gitkeep',
text='',
mode='w')
# scripts/build_components.sh: Write script for building components
write_and_chmod(
filepath=GENERATED_BUILD_COMPONENTS_SH_FILE,
text=render_jinja(
template_path=scripts_template_path / 'build_components.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR))
# scripts/build_pipeline_spec.sh: Write script for building pipeline specs
write_and_chmod(
filepath=GENERATED_PIPELINE_SPEC_SH_FILE,
text=render_jinja(
template_path=scripts_template_path / 'build_pipeline_spec.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR))
# scripts/run_pipline: Write script for running pipeline
write_and_chmod(
filepath=GENERATED_RUN_PIPELINE_SH_FILE,
text=render_jinja(
template_path=scripts_template_path / 'run_pipeline.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR))
# scripts/run_all.sh: Write script for running all files
write_and_chmod(
filepath=GENERATED_RUN_ALL_SH_FILE,
text=render_jinja(
template_path=scripts_template_path / 'run_all.sh.j2',
generated_license=GENERATED_LICENSE,
base_dir=BASE_DIR))
# scripts/publish_to_topic.sh: If using CI, write script for publishing to pubsub topic
if self.use_ci:
write_and_chmod(
filepath=GENERATED_PUBLISH_TO_TOPIC_FILE,
text=render_jinja(
template_path=scripts_template_path / 'publish_to_topic.sh.j2',
base_dir=BASE_DIR,
generated_license=GENERATED_LICENSE,
generated_parameter_values_path=GENERATED_PARAMETER_VALUES_PATH,
pubsub_topic_name=self.pubsub_topic_name))
# pipelines/pipeline.py: Generates a Kubeflow pipeline spec from custom components.
components_list = self._get_component_list()
pipeline_scaffold_contents = textwrap.indent(self.pipeline_scaffold, 4 * ' ')
write_file(
filepath=GENERATED_PIPELINE_FILE,
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'pipeline.py.j2',
components_list=components_list,
custom_training_job_specs=self.custom_training_job_specs,
generated_license=GENERATED_LICENSE,
pipeline_scaffold_contents=pipeline_scaffold_contents,
project_id=self.project_id),
mode='w')
# pipelines/pipeline_runner.py: Sends a PipelineJob to Vertex AI using pipeline spec.
write_file(
filepath=GENERATED_PIPELINE_RUNNER_FILE,
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'pipeline_runner.py.j2',
generated_license=GENERATED_LICENSE),
mode='w')
# pipelines/requirements.txt
write_file(
filepath=GENERATED_PIPELINE_REQUIREMENTS_FILE,
text=render_jinja(
template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'requirements.txt.j2',
pinned_kfp_version=PINNED_KFP_VERSION),
mode='w')
# pipelines/runtime_parameters/pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob.
self.pipeline_params['gs_pipeline_spec_path'] = self.gs_pipeline_job_spec_path
serialized_params = json.dumps(self.pipeline_params, indent=4)
write_file(BASE_DIR + GENERATED_PARAMETER_VALUES_PATH, serialized_params, 'w')