in src/docker-images/watchdog/src/watchdog.py [0:0]
def parse_pod_item(pod, k8s_pod_gauge, k8s_container_gauge, pods_info,
service_endpoints, vc_usage):
""" add metrics to k8s_pod_gauge or k8s_container_gauge if successfully paesed pod.
Because we are parsing json outputed by k8s, its format is subjected to change,
we should test if field exists before accessing it to avoid KeyError """
pod_name = pod["metadata"]["name"]
namespace = walk_json_field_safe(pod, "metadata", "namespace") or "default"
host_ip = walk_json_field_safe(pod, "status", "hostIP") or "unscheduled"
preemptable_str = walk_json_field_safe(pod, "metadata", "labels",
"preemptionAllowed") or False
preemptable = preemptable_str == "True" or preemptable_str == "true"
using_gpu = True
used_gpu = 0
containers = walk_json_field_safe(pod, "spec", "containers")
if containers is not None:
for container in containers:
req_gpu = int(
walk_json_field_safe(container, "resources", "requests",
"nvidia.com/gpu") or 0)
limit_gpu = int(
walk_json_field_safe(container, "resources", "limits",
"nvidia.com/gpu") or 0)
used_gpu += max(req_gpu, limit_gpu)
phase = walk_json_field_safe(pod, "status", "phase")
if phase == "Succeeded" or phase == "Failed":
using_gpu = False
vc = walk_json_field_safe(pod, "metadata", "labels", "vcName")
if vc is not None and preemptable is not None and using_gpu:
pods_info[host_ip].append(PodInfo(pod_name, preemptable, used_gpu))
gpu_type = walk_json_field_safe(pod, "metadata", "labels", "gpuType")
if gpu_type is None:
gpu_type = ""
if used_gpu != 0:
if gpu_type == "":
logger.warning(
"did not find gpuType labels in pod %s, may cause gpu unavailable",
pod_name)
if preemptable:
vc_usage.add_preemptable_used(vc, gpu_type, used_gpu)
else:
vc_usage.add_used(vc, gpu_type, used_gpu)
labels = pod["metadata"].get("labels")
if labels is None or ("app" not in labels and "jobId" not in labels):
logger.debug("unknown pod %s with labels %s", pod["metadata"]["name"],
labels)
return None
# get service name from label
service_name = labels.get("app") or labels.get("jobId")
if labels.get("app") is not None:
pod_type = "service"
else:
pod_type = "job"
annotations = walk_json_field_safe(pod, "metadata", "annotations") or {}
if host_ip != "unscheduled":
process_service_endpoints(service_name, host_ip, annotations,
service_endpoints)
deletion_timestamp = walk_json_field_safe(pod, "metadata",
"deletionTimestamp")
grace_period = walk_json_field_safe(pod, "metadata",
"deletionGracePeriodSeconds") or 0
status = pod["status"]
if deletion_timestamp is not None:
now = datetime.datetime.now(datetime.timezone.utc)
end = datetime.datetime.strptime(
deletion_timestamp,
"%Y-%m-%dT%H:%M:%S%z") + datetime.timedelta(seconds=grace_period)
if end < now:
phase = "terminating-timeout"
else:
phase = "terminating"
else:
if status.get("phase") is not None:
phase = status["phase"].lower()
else:
phase = "unknown"
initialized = pod_scheduled = ready = "unknown"
conditions = status.get("conditions")
if conditions is not None:
for cond in conditions:
cond_t = cond["type"] # Initialized|Ready|PodScheduled
cond_status = cond["status"].lower()
if cond_t == "Initialized":
initialized = cond_status
elif cond_t == "PodScheduled":
pod_scheduled = cond_status
elif cond_t == "Ready":
ready = cond_status
else:
error_counter.labels(type="unknown_pod_cond").inc()
logger.debug("unexpected condition %s in pod %s", cond_t,
pod_name)
k8s_pod_gauge.add_metric([
service_name, pod_name, namespace, phase, host_ip, initialized,
pod_scheduled, ready, pod_type
], 1)
# generate k8s_containers
if status.get("containerStatuses") is not None:
container_statuses = status["containerStatuses"]
for container_status in container_statuses:
container_name = container_status["name"]
ready = False
if container_status.get("ready") is not None:
ready = container_status["ready"]
container_state = None
if container_status.get("state") is not None:
state = container_status["state"]
if len(state) != 1:
error_counter.labels(
type="unexpected_container_state").inc()
logger.error("unexpected state %s in container %s",
json.dumps(state), container_name)
else:
container_state = list(state.keys())[0].lower()
k8s_container_gauge.add_metric([
service_name, pod_name, container_name, namespace,
container_state, host_ip,
str(ready).lower()
], 1)