ingestion-edge/ingestion_edge/flush_manager.py (437 lines of code) (raw):

"""Continuously flush ingestion-edge queue from detached persistent volumes.""" from argparse import ArgumentParser from dataclasses import dataclass from datetime import datetime, timedelta from functools import partial from multiprocessing.pool import ThreadPool from time import sleep from typing import Callable, Dict, List import json import os from kubernetes.config import load_incluster_config from kubernetes.client import ( BatchV1Api, CoreV1Api, V1Container, V1DeleteOptions, V1EnvVar, V1Job, V1JobSpec, V1ObjectMeta, V1ObjectReference, V1OwnerReference, V1PersistentVolume, V1PersistentVolumeClaim, V1PersistentVolumeClaimSpec, V1PersistentVolumeClaimVolumeSource, V1PersistentVolumeSpec, V1Pod, V1PodSpec, V1PodTemplateSpec, V1Preconditions, V1ResourceRequirements, V1Volume, V1VolumeMount, ) from kubernetes.client.rest import ApiException from .config import get_config_dict, logger JOB_AND_PVC_PREFIX = "flush-" ALREADY_EXISTS = "AlreadyExists" CONFLICT = "Conflict" NOT_FOUND = "Not Found" DEFAULT_IMAGE_VERSION = "latest" try: with open("version.json") as fp: DEFAULT_IMAGE_VERSION = str( json.load(fp).get("version") or DEFAULT_IMAGE_VERSION ) except (FileNotFoundError, json.decoder.JSONDecodeError): # pragma: no cover pass # use default DEFAULT_ENV = [ V1EnvVar(name=name, value=os.environ[name]) for name in get_config_dict() if name in os.environ ] FLUSH_JOB_REQUESTS_CPU = os.environ.get("FLUSH_JOB_REQUESTS_CPU", "100m") FLUSH_JOB_REQUESTS_MEMORY = os.environ.get("FLUSH_JOB_REQUESTS_MEMORY", "100Mi") FLUSH_JOB_LIMITS_CPU = os.environ.get("FLUSH_JOB_LIMITS_CPU", None) FLUSH_JOB_LIMITS_MEMORY = os.environ.get("FLUSH_JOB_LIMITS_MEMORY", "500Mi") parser = ArgumentParser(description=__doc__) parser.add_argument( "--command", default=["python", "-m", "ingestion_edge.flush"], type=json.loads, help="Docker command for flush jobs; must drain queue until empty or exit non-zero", ) parser.add_argument( "--env", default=DEFAULT_ENV, type=json.loads, help="env to use for flush jobs", ) parser.add_argument( "--image", default="mozilla/ingestion-edge:" + DEFAULT_IMAGE_VERSION, help="Docker image to use for flush jobs", ) parser.add_argument( "--namespace", default="default", help="Kubernetes namespace to use", ) parser.add_argument( "--service-account-name", default="default", help="Kubernetes service account name to use", ) parser.add_argument( "--claim-prefix", default="queue-", help="Prefix for the names of persistent volume claims to delete", ) parser.add_argument( "--pvc-cleanup-delay-seconds", default=60, type=int, help="Number of seconds to wait for persistent volume claims to remain detached " "before deleting", ) @dataclass(frozen=True) class PvcCacheEntry: """Cache entry for tracking detached persistent volume claims.""" pv: str time: datetime def _job_and_pvc_name_from_pv(pv: V1PersistentVolume) -> str: return JOB_AND_PVC_PREFIX + pv.metadata.name def _pv_name_from_job(job: V1Job) -> str: return job.metadata.name.replace(JOB_AND_PVC_PREFIX, "", 1) def _is_flush_job(job: V1Job) -> bool: return job.metadata.name.startswith(JOB_AND_PVC_PREFIX) def _create_pvc( api: CoreV1Api, name: str, namespace: str, pv: V1PersistentVolume ) -> V1PersistentVolumeClaim: logger.info(f"creating pvc: {name}") try: return api.create_namespaced_persistent_volume_claim( namespace=namespace, body=V1PersistentVolumeClaim( api_version="v1", kind="PersistentVolumeClaim", metadata=V1ObjectMeta(name=name, namespace=namespace), spec=V1PersistentVolumeClaimSpec( access_modes=["ReadWriteOnce"], resources=V1ResourceRequirements(requests=pv.spec.capacity), storage_class_name=pv.spec.storage_class_name, volume_name=pv.metadata.name, ), ), ) except ApiException as e: if e.reason == CONFLICT and json.loads(e.body)["reason"] == ALREADY_EXISTS: logger.info(f"using existing pvc: {name}") return api.read_namespaced_persistent_volume_claim(name, namespace) raise def _bind_pvc( api: CoreV1Api, pv: V1PersistentVolume, pvc: V1PersistentVolumeClaim ) -> V1PersistentVolume: logger.info(f"binding pv to pvc: {pv.metadata.name}, {pvc.metadata.name}") return api.patch_persistent_volume( name=pv.metadata.name, body=V1PersistentVolume( spec=V1PersistentVolumeSpec( claim_ref=V1ObjectReference( api_version="v1", kind="PersistentVolumeClaim", name=pvc.metadata.name, namespace=pvc.metadata.namespace, resource_version=pvc.metadata.resource_version, uid=pvc.metadata.uid, ) ) ), ) def _create_flush_job( batch_api: BatchV1Api, command: List[str], env: List[V1EnvVar], image: str, name: str, namespace: str, service_account_name: str, ) -> V1Job: logger.info(f"creating job: {name}") try: return batch_api.create_namespaced_job( namespace=namespace, body=V1Job( api_version="batch/v1", kind="Job", metadata=V1ObjectMeta(name=name, namespace=namespace), spec=V1JobSpec( template=V1PodTemplateSpec( spec=V1PodSpec( containers=[ V1Container( image=image, command=command, name="flush", volume_mounts=[ V1VolumeMount(mount_path="/data", name="queue") ], env=env, resources=V1ResourceRequirements( requests={ key: value for key, value in ( ("cpu", FLUSH_JOB_REQUESTS_CPU), ("memory", FLUSH_JOB_REQUESTS_MEMORY), ) if value }, limits={ key: value for key, value in ( ("cpu", FLUSH_JOB_LIMITS_CPU), ("memory", FLUSH_JOB_LIMITS_MEMORY), ) if value }, ), ) ], restart_policy="OnFailure", volumes=[ V1Volume( name="queue", persistent_volume_claim=( V1PersistentVolumeClaimVolumeSource( claim_name=name ) ), ) ], service_account_name=service_account_name, ) ) ), ), ) except ApiException as e: if e.reason == CONFLICT and json.loads(e.body)["reason"] == ALREADY_EXISTS: logger.info(f"using existing job: {name}") return batch_api.read_namespaced_job(name, namespace) raise def flush_released_pvs( api: CoreV1Api, batch_api: BatchV1Api, command: List[str], env: List[V1EnvVar], image: str, namespace: str, service_account_name: str, ): """ Flush persistent volumes. Gracefully handle resuming after an interruption, because this is not atomic. """ existing_jobs = { job.metadata.name for job in batch_api.list_namespaced_job(namespace).items } for pv in api.list_persistent_volume().items: name = _job_and_pvc_name_from_pv(pv) if ( name not in existing_jobs and pv.spec.claim_ref and pv.spec.claim_ref.namespace == namespace and pv.spec.persistent_volume_reclaim_policy != "Delete" and pv.status and (pv.status.phase == "Released" or pv.spec.claim_ref.name == name) ): logger.info(f"flushing unbound pv: {pv.metadata.name}") if pv.status.phase != "Bound": pvc = _create_pvc(api, name, namespace, pv) _bind_pvc(api, pv, pvc) _create_flush_job( batch_api, command, env, image, name, namespace, service_account_name ) def delete_complete_jobs(api: CoreV1Api, batch_api: BatchV1Api, namespace: str): """Delete complete jobs.""" for job in batch_api.list_namespaced_job(namespace).items: if ( job.status.conditions and job.status.conditions[0].type == "Complete" and not job.metadata.deletion_timestamp and _is_flush_job(job) ): logger.info(f"deleting complete job: {job.metadata.name}") # configure persistent volume claims to be deleted with the job pv_name = _pv_name_from_job(job) logger.info(f"including pv in pvc delete: {pv_name}") api.patch_persistent_volume( name=pv_name, body=V1PersistentVolume( spec=V1PersistentVolumeSpec( persistent_volume_reclaim_policy="Delete", ) ), ) logger.info(f"including pvc in job delete: {job.metadata.name}") api.patch_namespaced_persistent_volume_claim( name=job.metadata.name, namespace=namespace, body=V1PersistentVolumeClaim( metadata=V1ObjectMeta( owner_references=[ V1OwnerReference( api_version="batch/v1", kind="Job", name=job.metadata.name, uid=job.metadata.uid, block_owner_deletion=True, ) ] ) ), ) try: batch_api.delete_namespaced_job( name=job.metadata.name, namespace=namespace, body=V1DeleteOptions( grace_period_seconds=0, propagation_policy="Foreground", preconditions=V1Preconditions( resource_version=job.metadata.resource_version, uid=job.metadata.uid, ), ), ) except ApiException as e: if e.reason not in (CONFLICT, NOT_FOUND): raise logger.info(f"job already deleted or updated: {job.metadata.name}") def flush_released_pvs_and_delete_complete_jobs( api: CoreV1Api, batch_api: BatchV1Api, command: List[str], env: List[V1EnvVar], image: str, namespace: str, service_account_name: str, ): """Flush released persistent volumes then delete complete jobs. Run sequentially to avoid race conditions. """ flush_released_pvs( api, batch_api, command, env, image, namespace, service_account_name ) delete_complete_jobs(api, batch_api, namespace) def _unschedulable_due_to_pvc(pod: V1Pod): return ( pod.status and pod.status.phase == "Pending" and any( condition.reason == "Unschedulable" and condition.message and condition.message.startswith('persistentvolumeclaim "') and condition.message.endswith('" not found') for condition in (pod.status.conditions or []) if condition ) and pod.metadata.owner_references and any(ref.kind == "StatefulSet" for ref in pod.metadata.owner_references) ) def delete_detached_pvcs( api: CoreV1Api, namespace: str, claim_prefix: str, pvc_cleanup_delay: timedelta, detached_pvc_cache: Dict[str, PvcCacheEntry], ): """ Delete persistent volume claims that are not attached to any pods. If a persistent volume claim is deleted while attached to a pod, then the underlying persistent volume will remain bound until the delete is finalized, and the delete will not be finalized until the pod is also deleted. If a stateful set immediately recreates a pod (e.g. via `kubectl rollout restart`) that was attached to a persistent volume claim that was deleted, then the stateful set may still try to reuse the persistent volume claim after the delete is finalized. Delete the pod again to cause the stateful set to recreate the persistent volume claim when it next recreates the pod. """ attached_pvcs = { volume.persistent_volume_claim.claim_name for pod in api.list_namespaced_pod(namespace).items if not _unschedulable_due_to_pvc(pod) and pod.spec and pod.spec.volumes for volume in pod.spec.volumes if volume.persistent_volume_claim } for pvc in api.list_namespaced_persistent_volume_claim(namespace).items: if ( pvc.metadata.name.startswith(claim_prefix) and pvc.metadata.name not in attached_pvcs and not pvc.metadata.deletion_timestamp ): name, pv, now = pvc.metadata.name, pvc.spec.volume_name, datetime.utcnow() if name not in detached_pvc_cache or detached_pvc_cache[name].pv != pv: logger.info(f"found newly detached pvc: {pvc.metadata.name}") detached_pvc_cache[name] = PvcCacheEntry(pv, now) if (now - detached_pvc_cache[name].time) < pvc_cleanup_delay: # wait for pvc to remain detached for pvc_cleanup_delay before deleting continue logger.info(f"deleting detached pvc: {pvc.metadata.name}") try: api.delete_namespaced_persistent_volume_claim( name=pvc.metadata.name, namespace=namespace, body=V1DeleteOptions( grace_period_seconds=0, propagation_policy="Background", preconditions=V1Preconditions( resource_version=pvc.metadata.resource_version, uid=pvc.metadata.uid, ), ), ) except ApiException as e: if e.reason not in (CONFLICT, NOT_FOUND): raise logger.info(f"pvc already deleted or updated: {pvc.metadata.name}") else: # pvc is not detached, drop from cache if present detached_pvc_cache.pop(pvc.metadata.name, None) def delete_unschedulable_pods(api: CoreV1Api, namespace: str): """ Delete pods that are unschedulable due to a missing persistent volume claim. A stateful set may create a pod attached to a missing persistent volume claim if the pod is recreated while the persistent volume claim is pending delete. When this happens, delete the pod so that the stateful set will create a new persistent volume claim when it next creates the pod. """ for pod in api.list_namespaced_pod(namespace).items: if _unschedulable_due_to_pvc(pod): logger.info(f"deleting unschedulable pod: {pod.metadata.name}") try: api.delete_namespaced_pod( name=pod.metadata.name, namespace=namespace, body=V1DeleteOptions( grace_period_seconds=0, propagation_policy="Background", preconditions=V1Preconditions( resource_version=pod.metadata.resource_version, uid=pod.metadata.uid, ), ), ) except ApiException as e: if e.reason not in (CONFLICT, NOT_FOUND): raise logger.info(f"pod already deleted or updated: {pod.metadata.name}") def run_task(func: Callable[[], None]): """Continuously run func and print exceptions.""" while True: try: func() except Exception: logger.exception("unhandled exception") else: sleep(1) def main(): """Continuously flush and delete detached persistent volumes.""" args = parser.parse_args() load_incluster_config() api = CoreV1Api() batch_api = BatchV1Api() tasks = [ partial( flush_released_pvs_and_delete_complete_jobs, api, batch_api, args.command, args.env, args.image, args.namespace, args.service_account_name, ), partial( delete_detached_pvcs, api, args.namespace, args.claim_prefix, timedelta(seconds=args.pvc_cleanup_delay_seconds), {}, # detached_pvc_cache ), partial(delete_unschedulable_pods, api, args.namespace), ] with ThreadPool(len(tasks)) as pool: pool.map(run_task, tasks, chunksize=1) if __name__ == "__main__": # pragma: no cover main()