in google_cloud_automlops/AutoMLOps.py [0:0]
def generate(
project_id: str,
pipeline_params: Dict,
artifact_repo_location: Optional[str] = DEFAULT_RESOURCE_LOCATION,
artifact_repo_name: Optional[str] = None,
artifact_repo_type: Optional[str] = ArtifactRepository.ARTIFACT_REGISTRY.value,
base_image: Optional[str] = DEFAULT_BASE_IMAGE,
build_trigger_location: Optional[str] = DEFAULT_RESOURCE_LOCATION,
build_trigger_name: Optional[str] = None,
custom_training_job_specs: Optional[List[Dict]] = None,
deployment_framework: Optional[str] = Deployer.GITHUB_ACTIONS.value,
naming_prefix: Optional[str] = DEFAULT_NAMING_PREFIX,
orchestration_framework: Optional[str] = Orchestrator.KFP.value,
pipeline_job_location: Optional[str] = DEFAULT_RESOURCE_LOCATION,
pipeline_job_runner_service_account: Optional[str] = None,
pipeline_job_submission_service_location: Optional[str] = DEFAULT_RESOURCE_LOCATION,
pipeline_job_submission_service_name: Optional[str] = None,
pipeline_job_submission_service_type: Optional[str] = PipelineJobSubmitter.CLOUD_FUNCTIONS.value,
project_number: Optional[str] = None,
provision_credentials_key: str = None,
provisioning_framework: Optional[str] = Provisioner.GCLOUD.value,
pubsub_topic_name: Optional[str] = None,
schedule_location: Optional[str] = DEFAULT_RESOURCE_LOCATION,
schedule_name: Optional[str] = None,
schedule_pattern: Optional[str] = DEFAULT_SCHEDULE_PATTERN,
setup_model_monitoring: Optional[bool] = False,
source_repo_branch: Optional[str] = DEFAULT_SOURCE_REPO_BRANCH,
source_repo_name: Optional[str] = None,
source_repo_type: Optional[str] = CodeRepository.GITHUB.value,
storage_bucket_location: Optional[str] = DEFAULT_RESOURCE_LOCATION,
storage_bucket_name: Optional[str] = None,
use_ci: Optional[bool] = False,
vpc_connector: Optional[str] = DEFAULT_VPC_CONNECTOR,
workload_identity_pool: Optional[str] = None, #TODO: integrate optional creation of pool and provider during provisioning stage
workload_identity_provider: Optional[str] = None,
workload_identity_service_account: Optional[str] = None):
"""Generates relevant pipeline and component artifacts. Check constants file for variable
default values.
Args: See launchAll() function.
"""
# Validate that use_ci=True if schedule_pattern parameter is set or setup_model_monitoring is True
validate_use_ci(deployment_framework,
setup_model_monitoring,
schedule_pattern,
source_repo_type,
use_ci)
# Validate currently supported tools
if artifact_repo_type not in [e.value for e in ArtifactRepository]:
raise ValueError(
f'Unsupported artifact repository type: {artifact_repo_type}. \
Supported frameworks include: {", ".join([e.value for e in ArtifactRepository])}'
)
if source_repo_type not in [e.value for e in CodeRepository]:
raise ValueError(
f'Unsupported source repository type: {source_repo_type}. \
Supported frameworks include: {", ".join([e.value for e in CodeRepository])}'
)
if pipeline_job_submission_service_type not in [e.value for e in PipelineJobSubmitter]:
raise ValueError(
f'Unsupported pipeline job submissions service type: {pipeline_job_submission_service_type}. \
Supported frameworks include: {", ".join([e.value for e in PipelineJobSubmitter])}'
)
if orchestration_framework not in [e.value for e in Orchestrator]:
raise ValueError(
f'Unsupported orchestration framework: {orchestration_framework}. \
Supported frameworks include: {", ".join([e.value for e in Orchestrator])}'
)
if provisioning_framework not in [e.value for e in Provisioner]:
raise ValueError(
f'Unsupported provisioning framework: {provisioning_framework}. \
Supported frameworks include: {", ".join([e.value for e in Provisioner])}'
)
if deployment_framework not in [e.value for e in Deployer]:
raise ValueError(
f'Unsupported deployment framework: {deployment_framework}. \
Supported frameworks include: {", ".join([e.value for e in Deployer])}'
)
# Make standard directories
logging.info(f'Writing directories under {BASE_DIR}')
make_dirs(GENERATED_DIRS)
# Make optional directories
if use_ci:
make_dirs(GENERATED_SERVICES_DIRS)
if provisioning_framework == Provisioner.TERRAFORM.value:
make_dirs(GENERATED_TERRAFORM_DIRS)
if deployment_framework == Deployer.GITHUB_ACTIONS.value:
make_dirs(GENERATED_GITHUB_DIRS)
if setup_model_monitoring:
make_dirs(GENERATED_MODEL_MONITORING_DIRS)
# Set derived vars if none were given for certain variables
derived_artifact_repo_name = coalesce(artifact_repo_name, f'{naming_prefix}-artifact-registry')
derived_build_trigger_name = coalesce(build_trigger_name, f'{naming_prefix}-build-trigger')
derived_custom_training_job_specs = stringify_job_spec_list(custom_training_job_specs)
derived_pipeline_job_runner_service_account = coalesce(pipeline_job_runner_service_account, f'vertex-pipelines@{project_id}.iam.gserviceaccount.com')
derived_pipeline_job_submission_service_name = coalesce(pipeline_job_submission_service_name, f'{naming_prefix}-job-submission-svc')
derived_pubsub_topic_name = coalesce(pubsub_topic_name, f'{naming_prefix}-queueing-svc')
derived_schedule_name = coalesce(schedule_name, f'{naming_prefix}-schedule')
derived_source_repo_name = coalesce(source_repo_name, f'{naming_prefix}-repository')
derived_storage_bucket_name = coalesce(storage_bucket_name, f'{project_id}-{naming_prefix}-bucket')
# Write defaults.yaml
logging.info(f'Writing configurations to {GENERATED_DEFAULTS_FILE}')
defaults = create_default_config(
artifact_repo_location=artifact_repo_location,
artifact_repo_name=derived_artifact_repo_name,
artifact_repo_type=artifact_repo_type,
base_image=base_image,
build_trigger_location=build_trigger_location,
build_trigger_name=derived_build_trigger_name,
deployment_framework=deployment_framework,
naming_prefix=naming_prefix,
orchestration_framework=orchestration_framework,
pipeline_job_location=pipeline_job_location,
pipeline_job_runner_service_account=derived_pipeline_job_runner_service_account,
pipeline_job_submission_service_location=pipeline_job_submission_service_location,
pipeline_job_submission_service_name=derived_pipeline_job_submission_service_name,
pipeline_job_submission_service_type=pipeline_job_submission_service_type,
project_id=project_id,
provisioning_framework=provisioning_framework,
pubsub_topic_name=derived_pubsub_topic_name,
schedule_location=schedule_location,
schedule_name=derived_schedule_name,
schedule_pattern=schedule_pattern,
setup_model_monitoring=setup_model_monitoring,
source_repo_branch=source_repo_branch,
source_repo_name=derived_source_repo_name,
source_repo_type=source_repo_type,
storage_bucket_location=storage_bucket_location,
storage_bucket_name=derived_storage_bucket_name,
use_ci=use_ci,
vpc_connector=vpc_connector)
write_file(GENERATED_DEFAULTS_FILE, DEFAULTS_HEADER, 'w')
write_yaml_file(GENERATED_DEFAULTS_FILE, defaults, 'a')
# Generate files required to run a Kubeflow pipeline
if orchestration_framework == Orchestrator.KFP.value:
# Write kubeflow pipeline code
logging.info(f'Writing kubeflow pipelines code to {BASE_DIR}pipelines')
kfppipe = KFPPipeline(func=pipeline_glob.func,
name=pipeline_glob.name,
description=pipeline_glob.description,
comps_dict=components_dict)
kfppipe.build(pipeline_params, derived_custom_training_job_specs)
# Write kubeflow components code
logging.info(f'Writing kubeflow components code to {BASE_DIR}components')
for comp in kfppipe.comps:
logging.info(f' -- Writing {comp.name}')
KFPComponent(func=comp.func, packages_to_install=comp.packages_to_install).build()
# If user specified services, write services scripts
if use_ci:
logging.info(f'Writing submission service code to {BASE_DIR}services')
KFPServices().build()
# Generate files required to provision resources
if provisioning_framework == Provisioner.GCLOUD.value:
logging.info(f'Writing gcloud provisioning code to {BASE_DIR}provision')
GCloud(provision_credentials_key=provision_credentials_key).build()
elif provisioning_framework == Provisioner.TERRAFORM.value:
logging.info(f'Writing terraform provisioning code to {BASE_DIR}provision')
Terraform(provision_credentials_key=provision_credentials_key).build()
# Pulumi - Currently a roadmap item
# elif provisioning_framework == Provisioner.PULUMI.value:
# Pulumi(provision_credentials_key=provision_credentials_key).build()
# Generate files required to run cicd pipeline
if deployment_framework == Deployer.CLOUDBUILD.value:
logging.info(f'Writing cloud build config to {GENERATED_CLOUDBUILD_FILE}')
CloudBuild().build()
elif deployment_framework == Deployer.GITHUB_ACTIONS.value:
if project_number is None:
raise ValueError('Project number must be specified in order to use to use Github Actions integration.')
logging.info(f'Writing GitHub Actions config to {GENERATED_GITHUB_ACTIONS_FILE}')
GitHubActions(
project_number=project_number,
workload_identity_pool=workload_identity_pool,
workload_identity_provider=workload_identity_provider,
workload_identity_service_account=workload_identity_service_account
).build()
logging.info('Code Generation Complete.')