in python/pipelines/pipeline_ops.py [0:0]
def delete_schedules(
project_id: str,
region: str,
pipeline_name: str) -> list:
"""
This function deletes all schedules associated with a given pipeline name in a specific project and region.
Args:
project_id: The ID of the project that contains the pipeline.
region: The location of the pipeline.
pipeline_name: The name of the pipeline to delete schedules for.
Returns:
A list of the names of the deleted schedules. If no schedules are found, returns None.
Raises:
Exception: If an error occurs while deleting the schedules.
"""
# Get all schedules for the given pipeline name
schedules = get_schedules(project_id, region, pipeline_name)
if schedules is None:
logging.info(f"No schedules found with display_name {pipeline_name}")
return None
# Defines the header used in the API request
headers = requests.structures.CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
headers["Authorization"] = "Bearer {}".format(get_gcp_bearer_token())
# Delete each schedule where the display_name matches
deleted_schedules = []
for s in schedules:
url = f"https://{region}-aiplatform.googleapis.com/v1beta1/{s['name']}"
resp = requests.delete(url=url, headers=headers)
data = resp.json() # Check the JSON Response Content
logging.info(f"scheduled resourse {s['name']} deleted")
deleted_schedules.append(s['name'])
return deleted_schedules