in google_cloud_automlops/utils/utils.py [0:0]
def precheck_deployment_requirements(defaults: dict):
"""Checks to see if the necessary MLOps infra exists to run the deploy() step based on the user
tooling selection determined during the generate() step.
Args:
defaults: Contents of the Defaults yaml file (config/defaults.yaml).
"""
use_ci = defaults['tooling']['use_ci']
artifact_repo_location = defaults['gcp']['artifact_repo_location']
artifact_repo_name = defaults['gcp']['artifact_repo_name']
artifact_repo_type = defaults['gcp']['artifact_repo_type']
storage_bucket_name = defaults['gcp']['storage_bucket_name']
pipeline_job_runner_service_account = defaults['gcp']['pipeline_job_runner_service_account']
if use_ci:
pubsub_topic_name = defaults['gcp']['pubsub_topic_name']
pipeline_job_submission_service_name = defaults['gcp']['pipeline_job_submission_service_name']
pipeline_job_submission_service_location = defaults['gcp']['pipeline_job_submission_service_location']
pipeline_job_submission_service_type = defaults['gcp']['pipeline_job_submission_service_type']
submission_svc_prefix = 'gcr' if pipeline_job_submission_service_type == PipelineJobSubmitter.CLOUD_RUN.value else 'gcf'
pubsub_subscription_name = f'''{submission_svc_prefix}-{pipeline_job_submission_service_name}-{pipeline_job_submission_service_location}-{pubsub_topic_name}'''
build_trigger_name = defaults['gcp']['build_trigger_name']
build_trigger_location = defaults['gcp']['build_trigger_location']
deployment_framework = defaults['tooling']['deployment_framework']
credentials, project = google.auth.default()
logging.info(f'Checking for required API services in project {project}...')
service = discovery.build('serviceusage', 'v1', credentials=credentials, cache_discovery=False)
for api in get_required_apis(defaults):
request = service.services().get(name=f'projects/{project}/services/{api}')
try:
response = request.execute()
if response['state'] != 'ENABLED':
raise RuntimeError(f'{api} must be enabled in order to use AutoMLOps. '
'Please enable this API and re-run.')
except Exception as err:
raise RuntimeError(f'An error was encountered: {err}') from err
if artifact_repo_type == ArtifactRepository.ARTIFACT_REGISTRY.value:
logging.info(f'Checking for Artifact Registry in project {project}...')
service = discovery.build('artifactregistry', 'v1', credentials=credentials, cache_discovery=False)
request = service.projects().locations().repositories().get(
name=f'projects/{project}/locations/{artifact_repo_location}/repositories/{artifact_repo_name}')
try:
request.execute()
except Exception as err:
raise RuntimeError(f'Artifact Registry {artifact_repo_name} not found in project {project}. '
'Please create registry and continue.') from err
logging.info(f'Checking for Storage Bucket in project {project}...')
service = discovery.build('storage', 'v1', credentials=credentials, cache_discovery=False)
request = service.buckets().get(bucket=storage_bucket_name)
try:
request.execute()
except Exception as err:
raise RuntimeError(f'Storage Bucket {storage_bucket_name} not found in project {project}. '
'Please create bucket and continue.') from err
logging.info(f'Checking for Pipeline Runner Service Account in project {project}...')
service = discovery.build('iam', 'v1', credentials=credentials, cache_discovery=False)
request = service.projects().serviceAccounts().get(
name=f'projects/{project}/serviceAccounts/{pipeline_job_runner_service_account}')
try:
request.execute()
except Exception as err:
raise RuntimeError(f'Service Account {pipeline_job_runner_service_account} not found in project {project}. '
'Please create service account and continue.') from err
logging.info(f'Checking for IAM roles on Pipeline Runner Service Account in project {project}...')
service = discovery.build('cloudresourcemanager', 'v1', credentials=credentials, cache_discovery=False)
request = service.projects().getIamPolicy(
resource=project, body={'options': {'requestedPolicyVersion': 3}})
try:
response = request.execute()
iam_roles = set()
for element in response['bindings']:
if f'serviceAccount:{pipeline_job_runner_service_account}' in element['members']:
iam_roles.add(element['role'])
if not set(IAM_ROLES_RUNNER_SA).issubset(iam_roles):
raise RuntimeError('Missing the following IAM roles for service account '
f'{pipeline_job_runner_service_account}: {set(IAM_ROLES_RUNNER_SA).difference(iam_roles)}. '
'Please update service account roles and continue.')
except Exception as err:
raise RuntimeError(f'An error was encountered: {err}') from err
if use_ci:
logging.info(f'Checking for Pub/Sub Topic in project {project}...')
service = discovery.build('pubsub', 'v1', credentials=credentials, cache_discovery=False)
request = service.projects().topics().get(
topic=f'projects/{project}/topics/{pubsub_topic_name}')
try:
request.execute()
except Exception as err:
raise RuntimeError(f'Pub/Sub Topic {pubsub_topic_name} not found in project {project}. '
'Please create Pub/Sub Topic and continue.') from err
logging.info(f'Checking for Pub/Sub Subscription in project {project}...')
service = discovery.build('pubsub', 'v1', credentials=credentials, cache_discovery=False)
request = service.projects().subscriptions().get(
subscription=f'projects/{project}/subscriptions/{pubsub_subscription_name}')
try:
request.execute()
except Exception as err:
raise RuntimeError(f'Pub/Sub Subscription {pubsub_subscription_name} not found in project {project}. '
'Please create Pub/Sub Subscription and continue.') from err
if pipeline_job_submission_service_type == PipelineJobSubmitter.CLOUD_RUN.value:
logging.info(f'Checking for Cloud Run Pipeline Job Submission Service in project {project}...')
service = discovery.build('run', 'v1', credentials=credentials, cache_discovery=False)
request = service.projects().locations().services().get(
name=f'projects/{project}/locations/{pipeline_job_submission_service_location}/services/{pipeline_job_submission_service_name}')
try:
request.execute()
except Exception as err:
raise RuntimeError(f'Cloud Run Pipeline Job Submission Service {pipeline_job_submission_service_name} not found in project {project}. '
'Please redeploy the submission service and continue.') from err
if pipeline_job_submission_service_type == PipelineJobSubmitter.CLOUD_FUNCTIONS.value:
logging.info(f'Checking for Cloud Functions Pipeline Job Submission Service in project {project}...')
service = discovery.build('cloudfunctions', 'v1', credentials=credentials, cache_discovery=False)
request = service.projects().locations().functions().get(
name=f'projects/{project}/locations/{pipeline_job_submission_service_location}/functions/{pipeline_job_submission_service_name}')
try:
request.execute()
except Exception as err:
raise RuntimeError(f'Cloud Functions Pipeline Job Submission Service {pipeline_job_submission_service_name} not found in project {project}. '
'Please redeploy the submission service and continue.') from err
if deployment_framework == Deployer.CLOUDBUILD.value:
logging.info(f'Checking for Cloud Build Trigger in project {project}...')
service = discovery.build('cloudbuild', 'v1', credentials=credentials, cache_discovery=False)
request = service.projects().locations().triggers().get(
name=f'projects/{project}/locations/{build_trigger_location}/triggers/{build_trigger_name}',
projectId=project, triggerId=build_trigger_name)
try:
request.execute()
except Exception as err:
raise RuntimeError(f'Cloud Build Trigger {build_trigger_name} not found in project {project}. '
'Please create Cloud Build Trigger and continue.') from err
logging.info('Precheck successfully completed, continuing to deployment.\n')