def schedule_pipeline()

in python/pipelines/pipeline_ops.py [0:0]


def schedule_pipeline(
        project_id: str,
        region: str,
        template_path: str,
        pipeline_name: str,
        pipeline_sa: str,
        pipeline_root: str,
        cron: str,
        max_concurrent_run_count: str,
        start_time: str,
        end_time: str = None,
        subnetwork: str = "default",
        use_private_service_access: bool = False,
        pipeline_parameters: Dict[str, Any] = None,
        pipeline_parameters_substitutions: Optional[Dict[str, Any]] = None,
    ) -> dict:
    """
    This function schedules a Vertex AI Pipeline to run on a regular basis.

    Args:
        project_id: The ID of the project that contains the pipeline.
        region: The location of the pipeline.
        pipeline_name: The name of the pipeline to schedule.
        pipeline_template_uri: The URI of the pipeline template file.
        pipeline_sa: The service account to use for the pipeline.
        pipeline_root: The root directory of the pipeline.
        cron: The cron expression that defines the schedule.
        max_concurrent_run_count: The maximum number of concurrent pipeline runs.
        start_time: The start time of the schedule.
        end_time: The end time of the schedule.
        subnetwork: The VPC subnetwork name to be used in VPC peering.
        use_private_service_access: A flag to define whether to use the VPC private service access or not.

    Returns:
        A dictionary containing information about the scheduled pipeline.

    Raises:
        Exception: If an error occurs while scheduling the pipeline.
    """

    from google.cloud import aiplatform

    # Substitute pipeline parameters with necessary substitutions
    if pipeline_parameters_substitutions != None:
        pipeline_parameters = substitute_pipeline_params(
            pipeline_parameters, pipeline_parameters_substitutions)
    
    # Deletes scheduled queries with matching description
    delete_schedules(project_id, region, pipeline_name)

    # Create a PipelineJob object
    pipeline_job = aiplatform.PipelineJob(
        template_path=template_path,
        pipeline_root=pipeline_root,
        location=region,
        display_name=f"{pipeline_name}",
    )

    # https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJobSchedule
    # Create a schedule for the pipeline job
    pipeline_job_schedule = aiplatform.PipelineJobSchedule(
        display_name=f"{pipeline_name}",
        pipeline_job=pipeline_job,
        location=region
    )

    # Get the project number to use in the network identifier
    project_number = _get_project_number(project_id)

    # Create the schedule using the pipeline job schedule
    # Using the VPC private service access or not, depending on the flag
    if use_private_service_access:
        pipeline_job_schedule.create(
            cron=cron,
            max_concurrent_run_count=max_concurrent_run_count,
            start_time=start_time,
            end_time=end_time,
            service_account=pipeline_sa,
            network=f"projects/{project_number}/global/networks/{subnetwork}",
            create_request_timeout=None,
        )
    else:
        pipeline_job_schedule.create(
            cron=cron,
            max_concurrent_run_count=max_concurrent_run_count,
            start_time=start_time,
            end_time=end_time,
            service_account=pipeline_sa,
            create_request_timeout=None,
        )

    # Old version - Create the schedule with the pipeline job defined
    #pipeline_job_schedule = pipeline_job.create_schedule(
    #    display_name=f"{pipeline_name}",
    #    cron=cron,
    #    max_concurrent_run_count=max_concurrent_run_count,
    #    start_time=start_time,
    #    end_time=end_time,
    #    service_account=pipeline_sa,
    #)

    logging.info(f"Pipeline scheduled : {pipeline_name}")

    return pipeline_job