in sdk/aws_orbit_sdk/controller.py [0:0]
def tail_logs(team_name, tasks) -> None:
for task in tasks:
task_id = task["Identifier"]
_logger.info("Watching task: '%s'", task_id)
namespace = os.environ.get("AWS_ORBIT_USER_SPACE", team_name)
current_pods: V1PodList = CoreV1Api().list_namespaced_pod(
namespace=namespace, label_selector=f"job-name={task_id}"
)
for pod in current_pods.items:
pod_instance: V1Pod = cast(V1Pod, pod)
_logger.debug("pod: %s", pod_instance.metadata.name)
pod_status: V1PodStatus = cast(V1PodStatus, pod_instance.status)
_logger.debug("pod s: %s", pod_status)
if pod_status.conditions:
for c in pod_status.conditions:
condition: V1PodCondition = cast(V1PodCondition, c)
if condition.type == "Failed" or condition.reason == "Unschedulable":
_logger.info("pod has error status %s , %s", condition.reason, condition.message)
return
if pod_status.container_statuses:
for s in pod_status.container_statuses:
container_status: V1ContainerStatus = cast(V1ContainerStatus, s)
container_state: V1ContainerState = container_status.state
_logger.debug("task status: %s ", container_status)
if container_status.started or container_state.running or container_state.terminated:
_logger.info("task %s status: %s", pod_instance.metadata.name, container_state)
w = k8_watch.Watch()
for line in w.stream(
CoreV1Api().read_namespaced_pod_log, name=pod_instance.metadata.name, namespace=namespace
):
_logger.info(line)
else:
_logger.info("task not started yet for %s", task_id)