in python/pipelines/pipeline_ops.py [0:0]
def get_schedules(
project_id: str,
region: str,
pipeline_name: str) -> list:
"""
This function retrieves all schedules associated with a given pipeline name in a specific project and region.
Args:
project_id: The ID of the project that contains the pipeline.
region: The location of the pipeline.
pipeline_name: The name of the pipeline to retrieve schedules for.
Returns:
A list of the schedules associated with the pipeline. If no schedules are found, returns None.
Raises:
Exception: If an error occurs while retrieving the schedules.
"""
# Defines the filter query parameter for the URL request
filter = ""
if pipeline_name is not None:
filter = f"filter=display_name={pipeline_name}"
url = f"https://{region}-aiplatform.googleapis.com/v1beta1/projects/{project_id}/locations/{region}/schedules?{filter}"
# Defines the header for the URL request
headers = requests.structures.CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
headers["Authorization"] = "Bearer {}".format(get_gcp_bearer_token())
# Make the request
resp = requests.get(url=url, headers=headers)
data = resp.json() # Check the JSON Response Content
if "schedules" in data:
return data['schedules']
else:
return None