in python/pipelines/pipeline_ops.py [0:0]
def schedule_pipeline(
project_id: str,
region: str,
template_path: str,
pipeline_name: str,
pipeline_sa: str,
pipeline_root: str,
cron: str,
max_concurrent_run_count: str,
start_time: str,
end_time: str = None,
subnetwork: str = "default",
use_private_service_access: bool = False,
pipeline_parameters: Dict[str, Any] = None,
pipeline_parameters_substitutions: Optional[Dict[str, Any]] = None,
) -> dict:
"""
This function schedules a Vertex AI Pipeline to run on a regular basis.
Args:
project_id: The ID of the project that contains the pipeline.
region: The location of the pipeline.
pipeline_name: The name of the pipeline to schedule.
pipeline_template_uri: The URI of the pipeline template file.
pipeline_sa: The service account to use for the pipeline.
pipeline_root: The root directory of the pipeline.
cron: The cron expression that defines the schedule.
max_concurrent_run_count: The maximum number of concurrent pipeline runs.
start_time: The start time of the schedule.
end_time: The end time of the schedule.
subnetwork: The VPC subnetwork name to be used in VPC peering.
use_private_service_access: A flag to define whether to use the VPC private service access or not.
Returns:
A dictionary containing information about the scheduled pipeline.
Raises:
Exception: If an error occurs while scheduling the pipeline.
"""
from google.cloud import aiplatform
# Substitute pipeline parameters with necessary substitutions
if pipeline_parameters_substitutions != None:
pipeline_parameters = substitute_pipeline_params(
pipeline_parameters, pipeline_parameters_substitutions)
# Deletes scheduled queries with matching description
delete_schedules(project_id, region, pipeline_name)
# Create a PipelineJob object
pipeline_job = aiplatform.PipelineJob(
template_path=template_path,
pipeline_root=pipeline_root,
location=region,
display_name=f"{pipeline_name}",
)
# https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJobSchedule
# Create a schedule for the pipeline job
pipeline_job_schedule = aiplatform.PipelineJobSchedule(
display_name=f"{pipeline_name}",
pipeline_job=pipeline_job,
location=region
)
# Get the project number to use in the network identifier
project_number = _get_project_number(project_id)
# Create the schedule using the pipeline job schedule
# Using the VPC private service access or not, depending on the flag
if use_private_service_access:
pipeline_job_schedule.create(
cron=cron,
max_concurrent_run_count=max_concurrent_run_count,
start_time=start_time,
end_time=end_time,
service_account=pipeline_sa,
network=f"projects/{project_number}/global/networks/{subnetwork}",
create_request_timeout=None,
)
else:
pipeline_job_schedule.create(
cron=cron,
max_concurrent_run_count=max_concurrent_run_count,
start_time=start_time,
end_time=end_time,
service_account=pipeline_sa,
create_request_timeout=None,
)
# Old version - Create the schedule with the pipeline job defined
#pipeline_job_schedule = pipeline_job.create_schedule(
# display_name=f"{pipeline_name}",
# cron=cron,
# max_concurrent_run_count=max_concurrent_run_count,
# start_time=start_time,
# end_time=end_time,
# service_account=pipeline_sa,
#)
logging.info(f"Pipeline scheduled : {pipeline_name}")
return pipeline_job