in jupyterlab_orbit/jupyterlab_orbit/handlers/containers.py [0:0]
def _dump_pod(clist) -> str:
data: List[Dict[str, str]] = []
for c in clist:
container: Dict[str, str] = dict()
container["name"] = c["metadata"]["name"]
if "app" in c["metadata"]["labels"]:
container["pod_app"] = c["metadata"]["labels"]["app"]
if "emr-spark" == c["metadata"]["labels"]["app"]:
container["job_name"] = c["metadata"]["labels"]["emr-containers.amazonaws.com/job.id"]
else:
container["job_name"] = c["metadata"]["labels"]["job-name"]
container["time"] = c["metadata"]["creationTimestamp"]
response_datetime_format = "%Y-%m-%dT%H:%M:%SZ"
if "status" in c:
# Succeeded / Completed
constainer_phase_status = (c["status"]["phase"]).lower()
if constainer_phase_status in ["succeeded", "failed"]:
if container["pod_app"] == "emr-spark":
container_status = [
cs for cs in c["status"]["containerStatuses"] if "spark-kubernetes-driver" == cs["name"]
][0]
else:
container_status = c["status"]["containerStatuses"][0]
completion_dt = datetime.strptime(
container_status["state"]["terminated"]["finishedAt"],
response_datetime_format,
)
start_dt = datetime.strptime(
container_status["state"]["terminated"]["startedAt"],
response_datetime_format,
)
duration = completion_dt - start_dt
container["duration"] = str(duration)
container["completionTime"] = (
container_status["state"]["terminated"]["finishedAt"]
if constainer_phase_status == "succeeded"
else ""
)
container["job_state"] = constainer_phase_status
elif constainer_phase_status == "running":
started_at = datetime.strptime(c["status"]["startTime"], response_datetime_format)
duration = datetime.utcnow() - started_at
container["duration"] = str(duration).split(".")[0]
container["completionTime"] = ""
container["job_state"] = constainer_phase_status
else:
container["completionTime"] = ""
container["duration"] = ""
container["job_state"] = "unknown"
else:
container["completionTime"] = ""
container["duration"] = ""
container["job_state"] = "unknown"
if container["pod_app"] == "emr-spark":
container_task = [ct for ct in c["spec"]["containers"] if "spark-kubernetes-driver" == ct["name"]][0]
container["hint"] = json.dumps(container_task, indent=4)
container["tasks"] = container_task["args"]
container["notebook"] = container_task["args"][-2].split("/")[-1]
container["container_name"] = "spark-kubernetes-driver"
else:
envs = c["spec"]["containers"][0]["env"]
tasks = json.loads([e["value"] for e in envs if e["name"] == "tasks"][0])
container["hint"] = json.dumps(tasks, indent=4)
container["tasks"] = tasks["tasks"]
container["notebook"] = (
tasks["tasks"][0]["notebookName"]
if "notebookName" in tasks["tasks"][0]
else f'{tasks["tasks"][0]["moduleName"]}.{tasks["tasks"][0]["functionName"]}'
)
container["container_name"] = ""
if "labels" in c["metadata"]:
container["node_type"] = (
c["metadata"]["labels"]["orbit/node-type"]
if "orbit/node-type" in c["metadata"]["labels"]
else "unknown"
)
container["job_type"] = (
c["metadata"]["labels"]["app"] if "app" in c["metadata"]["labels"] else "unknown"
)
if container["job_state"] == "running":
container["rank"] = 1
else:
container["rank"] = 2
container["info"] = c
data.append(container)
data = sorted(
data,
key=lambda i: (
i["rank"],
i["creationTimestamp"] if "creationTimestamp" in i else i["name"],
),
)
return json.dumps(data)