def get_schedules()

in python/pipelines/pipeline_ops.py [0:0]


def get_schedules(
        project_id: str,
        region: str,
        pipeline_name: str) -> list:
    """
    This function retrieves all schedules associated with a given pipeline name in a specific project and region.

    Args:
        project_id: The ID of the project that contains the pipeline.
        region: The location of the pipeline.
        pipeline_name: The name of the pipeline to retrieve schedules for.

    Returns:
        A list of the schedules associated with the pipeline. If no schedules are found, returns None.

    Raises:
        Exception: If an error occurs while retrieving the schedules.
    """

    # Defines the filter query parameter for the URL request
    filter = ""
    if pipeline_name is not None:
        filter = f"filter=display_name={pipeline_name}"
    url = f"https://{region}-aiplatform.googleapis.com/v1beta1/projects/{project_id}/locations/{region}/schedules?{filter}"

    # Defines the header for the URL request
    headers = requests.structures.CaseInsensitiveDict()
    headers["Content-Type"] = "application/json"
    headers["Authorization"] = "Bearer {}".format(get_gcp_bearer_token())

    # Make the request
    resp = requests.get(url=url, headers=headers)
    data = resp.json()  # Check the JSON Response Content 
    if "schedules" in data:
        return data['schedules']
    else:
        return None