def _dump_pod()

in jupyterlab_orbit/jupyterlab_orbit/handlers/containers.py [0:0]


    def _dump_pod(clist) -> str:
        data: List[Dict[str, str]] = []
        for c in clist:
            container: Dict[str, str] = dict()
            container["name"] = c["metadata"]["name"]
            if "app" in c["metadata"]["labels"]:
                container["pod_app"] = c["metadata"]["labels"]["app"]
                if "emr-spark" == c["metadata"]["labels"]["app"]:
                    container["job_name"] = c["metadata"]["labels"]["emr-containers.amazonaws.com/job.id"]
                else:
                    container["job_name"] = c["metadata"]["labels"]["job-name"]

            container["time"] = c["metadata"]["creationTimestamp"]
            response_datetime_format = "%Y-%m-%dT%H:%M:%SZ"

            if "status" in c:
                # Succeeded / Completed
                constainer_phase_status = (c["status"]["phase"]).lower()
                if constainer_phase_status in ["succeeded", "failed"]:
                    if container["pod_app"] == "emr-spark":
                        container_status = [
                            cs for cs in c["status"]["containerStatuses"] if "spark-kubernetes-driver" == cs["name"]
                        ][0]
                    else:
                        container_status = c["status"]["containerStatuses"][0]

                    completion_dt = datetime.strptime(
                        container_status["state"]["terminated"]["finishedAt"],
                        response_datetime_format,
                    )
                    start_dt = datetime.strptime(
                        container_status["state"]["terminated"]["startedAt"],
                        response_datetime_format,
                    )
                    duration = completion_dt - start_dt
                    container["duration"] = str(duration)
                    container["completionTime"] = (
                        container_status["state"]["terminated"]["finishedAt"]
                        if constainer_phase_status == "succeeded"
                        else ""
                    )
                    container["job_state"] = constainer_phase_status
                elif constainer_phase_status == "running":
                    started_at = datetime.strptime(c["status"]["startTime"], response_datetime_format)
                    duration = datetime.utcnow() - started_at
                    container["duration"] = str(duration).split(".")[0]
                    container["completionTime"] = ""
                    container["job_state"] = constainer_phase_status
                else:
                    container["completionTime"] = ""
                    container["duration"] = ""
                    container["job_state"] = "unknown"
            else:
                container["completionTime"] = ""
                container["duration"] = ""
                container["job_state"] = "unknown"

            if container["pod_app"] == "emr-spark":
                container_task = [ct for ct in c["spec"]["containers"] if "spark-kubernetes-driver" == ct["name"]][0]
                container["hint"] = json.dumps(container_task, indent=4)
                container["tasks"] = container_task["args"]
                container["notebook"] = container_task["args"][-2].split("/")[-1]
                container["container_name"] = "spark-kubernetes-driver"
            else:
                envs = c["spec"]["containers"][0]["env"]
                tasks = json.loads([e["value"] for e in envs if e["name"] == "tasks"][0])
                container["hint"] = json.dumps(tasks, indent=4)
                container["tasks"] = tasks["tasks"]
                container["notebook"] = (
                    tasks["tasks"][0]["notebookName"]
                    if "notebookName" in tasks["tasks"][0]
                    else f'{tasks["tasks"][0]["moduleName"]}.{tasks["tasks"][0]["functionName"]}'
                )
                container["container_name"] = ""
            if "labels" in c["metadata"]:
                container["node_type"] = (
                    c["metadata"]["labels"]["orbit/node-type"]
                    if "orbit/node-type" in c["metadata"]["labels"]
                    else "unknown"
                )
                container["job_type"] = (
                    c["metadata"]["labels"]["app"] if "app" in c["metadata"]["labels"] else "unknown"
                )
            if container["job_state"] == "running":
                container["rank"] = 1
            else:
                container["rank"] = 2
            container["info"] = c
            data.append(container)
        data = sorted(
            data,
            key=lambda i: (
                i["rank"],
                i["creationTimestamp"] if "creationTimestamp" in i else i["name"],
            ),
        )
        return json.dumps(data)