def parse_pod_item()

in src/docker-images/watchdog/src/watchdog.py [0:0]


def parse_pod_item(pod, k8s_pod_gauge, k8s_container_gauge, pods_info,
                   service_endpoints, vc_usage):
    """ add metrics to k8s_pod_gauge or k8s_container_gauge if successfully paesed pod.
    Because we are parsing json outputed by k8s, its format is subjected to change,
    we should test if field exists before accessing it to avoid KeyError """

    pod_name = pod["metadata"]["name"]
    namespace = walk_json_field_safe(pod, "metadata", "namespace") or "default"
    host_ip = walk_json_field_safe(pod, "status", "hostIP") or "unscheduled"
    preemptable_str = walk_json_field_safe(pod, "metadata", "labels",
                                           "preemptionAllowed") or False
    preemptable = preemptable_str == "True" or preemptable_str == "true"
    using_gpu = True

    used_gpu = 0
    containers = walk_json_field_safe(pod, "spec", "containers")
    if containers is not None:
        for container in containers:
            req_gpu = int(
                walk_json_field_safe(container, "resources", "requests",
                                     "nvidia.com/gpu") or 0)
            limit_gpu = int(
                walk_json_field_safe(container, "resources", "limits",
                                     "nvidia.com/gpu") or 0)
            used_gpu += max(req_gpu, limit_gpu)
            phase = walk_json_field_safe(pod, "status", "phase")
            if phase == "Succeeded" or phase == "Failed":
                using_gpu = False

    vc = walk_json_field_safe(pod, "metadata", "labels", "vcName")

    if vc is not None and preemptable is not None and using_gpu:
        pods_info[host_ip].append(PodInfo(pod_name, preemptable, used_gpu))
        gpu_type = walk_json_field_safe(pod, "metadata", "labels", "gpuType")
        if gpu_type is None:
            gpu_type = ""

        if used_gpu != 0:
            if gpu_type == "":
                logger.warning(
                    "did not find gpuType labels in pod %s, may cause gpu unavailable",
                    pod_name)

            if preemptable:
                vc_usage.add_preemptable_used(vc, gpu_type, used_gpu)
            else:
                vc_usage.add_used(vc, gpu_type, used_gpu)

    labels = pod["metadata"].get("labels")
    if labels is None or ("app" not in labels and "jobId" not in labels):
        logger.debug("unknown pod %s with labels %s", pod["metadata"]["name"],
                     labels)
        return None

    # get service name from label
    service_name = labels.get("app") or labels.get("jobId")
    if labels.get("app") is not None:
        pod_type = "service"
    else:
        pod_type = "job"

    annotations = walk_json_field_safe(pod, "metadata", "annotations") or {}
    if host_ip != "unscheduled":
        process_service_endpoints(service_name, host_ip, annotations,
                                  service_endpoints)

    deletion_timestamp = walk_json_field_safe(pod, "metadata",
                                              "deletionTimestamp")
    grace_period = walk_json_field_safe(pod, "metadata",
                                        "deletionGracePeriodSeconds") or 0

    status = pod["status"]

    if deletion_timestamp is not None:
        now = datetime.datetime.now(datetime.timezone.utc)
        end = datetime.datetime.strptime(
            deletion_timestamp,
            "%Y-%m-%dT%H:%M:%S%z") + datetime.timedelta(seconds=grace_period)
        if end < now:
            phase = "terminating-timeout"
        else:
            phase = "terminating"
    else:
        if status.get("phase") is not None:
            phase = status["phase"].lower()
        else:
            phase = "unknown"

    initialized = pod_scheduled = ready = "unknown"

    conditions = status.get("conditions")
    if conditions is not None:
        for cond in conditions:
            cond_t = cond["type"] # Initialized|Ready|PodScheduled
            cond_status = cond["status"].lower()

            if cond_t == "Initialized":
                initialized = cond_status
            elif cond_t == "PodScheduled":
                pod_scheduled = cond_status
            elif cond_t == "Ready":
                ready = cond_status
            else:
                error_counter.labels(type="unknown_pod_cond").inc()
                logger.debug("unexpected condition %s in pod %s", cond_t,
                             pod_name)

    k8s_pod_gauge.add_metric([
        service_name, pod_name, namespace, phase, host_ip, initialized,
        pod_scheduled, ready, pod_type
    ], 1)

    # generate k8s_containers
    if status.get("containerStatuses") is not None:
        container_statuses = status["containerStatuses"]

        for container_status in container_statuses:
            container_name = container_status["name"]

            ready = False

            if container_status.get("ready") is not None:
                ready = container_status["ready"]

            container_state = None
            if container_status.get("state") is not None:
                state = container_status["state"]
                if len(state) != 1:
                    error_counter.labels(
                        type="unexpected_container_state").inc()
                    logger.error("unexpected state %s in container %s",
                                 json.dumps(state), container_name)
                else:
                    container_state = list(state.keys())[0].lower()

            k8s_container_gauge.add_metric([
                service_name, pod_name, container_name, namespace,
                container_state, host_ip,
                str(ready).lower()
            ], 1)