in src/checker_common.py [0:0]
def get_created_jobs(release_names: Iterable[str]) -> Iterable[str]:
"""Get jobs created by a given helm release.
Args:
release_names: Iterable of helm release names to get jobs for.
Returns:
Iterable of job names created by the helm releases.
"""
config.load_incluster_config()
batch_v1 = batch_v1_api.BatchV1Api()
try:
jobs = batch_v1.list_namespaced_job(namespace="default").items
release_name_annotation_key = "meta.helm.sh/release-name"
matching_jobs = [
job.metadata.name
for job in jobs
if job.metadata.annotations
and release_name_annotation_key in job.metadata.annotations
and job.metadata.annotations[release_name_annotation_key]
in release_names
]
return matching_jobs
except client.ApiException as e:
print(f"Error getting Jobs: {e}")
return []