in ingestion-edge/ingestion_edge/flush_manager.py [0:0]
def delete_complete_jobs(api: CoreV1Api, batch_api: BatchV1Api, namespace: str):
"""Delete complete jobs."""
for job in batch_api.list_namespaced_job(namespace).items:
if (
job.status.conditions
and job.status.conditions[0].type == "Complete"
and not job.metadata.deletion_timestamp
and _is_flush_job(job)
):
logger.info(f"deleting complete job: {job.metadata.name}")
# configure persistent volume claims to be deleted with the job
pv_name = _pv_name_from_job(job)
logger.info(f"including pv in pvc delete: {pv_name}")
api.patch_persistent_volume(
name=pv_name,
body=V1PersistentVolume(
spec=V1PersistentVolumeSpec(
persistent_volume_reclaim_policy="Delete",
)
),
)
logger.info(f"including pvc in job delete: {job.metadata.name}")
api.patch_namespaced_persistent_volume_claim(
name=job.metadata.name,
namespace=namespace,
body=V1PersistentVolumeClaim(
metadata=V1ObjectMeta(
owner_references=[
V1OwnerReference(
api_version="batch/v1",
kind="Job",
name=job.metadata.name,
uid=job.metadata.uid,
block_owner_deletion=True,
)
]
)
),
)
try:
batch_api.delete_namespaced_job(
name=job.metadata.name,
namespace=namespace,
body=V1DeleteOptions(
grace_period_seconds=0,
propagation_policy="Foreground",
preconditions=V1Preconditions(
resource_version=job.metadata.resource_version,
uid=job.metadata.uid,
),
),
)
except ApiException as e:
if e.reason not in (CONFLICT, NOT_FOUND):
raise
logger.info(f"job already deleted or updated: {job.metadata.name}")