in xlml/utils/gke.py [0:0]
def stream_logs(name: str):
def _watch_pod(name, namespace) -> Optional[int]:
logs_watcher = kubernetes.watch.Watch()
logging.info(f'Waiting for pod {name} to start...')
pod_watcher = kubernetes.watch.Watch()
for event in pod_watcher.stream(
core_api.list_namespaced_pod,
namespace,
field_selector=f'metadata.name={name}',
):
status = event['object'].status
logging.info(
f'Pod {event["object"].metadata.name} status: {status.phase}'
)
if status.phase != 'Pending':
break
logging.info(f'Streaming pod logs for {name}...')
for line in logs_watcher.stream(
core_api.read_namespaced_pod_log,
name,
namespace,
_request_timeout=3600,
):
logging.info(f'{name}] {line}')
logging.warning(f'Lost logs stream for {name}.')
pod = core_api.read_namespaced_pod(namespace='default', name=name)
if pod.status.container_statuses:
container_status = pod.status.container_statuses[0]
if pod.status.container_statuses[0].state.terminated:
exit_code = container_status.state.terminated.exit_code
if exit_code:
logging.error(f'Pod {name} had non-zero exit code {exit_code}')
return exit_code
logging.warning(f'Unknown status for pod {name}')
return None
# We need to re-authenticate if the stream_logs fail. This can happen when
# the job runs for too long and the credential expire.
client = get_authenticated_client(gcp.project_name, gcp.zone, cluster_name)
batch_api = kubernetes.client.BatchV1Api(client)
core_api = kubernetes.client.CoreV1Api(client)
pod_label_selector = f'batch.kubernetes.io/job-name={name}'
pods = core_api.list_namespaced_pod(
namespace='default', label_selector=pod_label_selector
)
# TODO(piz): Use time.sleep may not be a good solution here. However, I expect
# resources are all ready in wait_all_pods_ready stage. This just in case
# authentication takes time. Check with Will for better solutions.
time.sleep(30)
if len(pods.items) != body['spec']['parallelism']:
logging.info('Waiting for all pods to be re-connected...')
raise PodsNotReadyError('pods are not ready after refreshing credential.')
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for pod in pods.items:
f = executor.submit(
_watch_pod, pod.metadata.name, pod.metadata.namespace
)
futures.append(f)
# Wait for pods to complete, and exit with the first non-zero exit code.
for f in concurrent.futures.as_completed(futures):
try:
# TODO(piz/wcromar): it looks like there is a delay between as_completed
# and update of f.result(). exit_code can be None even task is complete.
exit_code = f.result()
except kubernetes.client.ApiException as e:
logging.error('Kubernetes error. Retrying...', exc_info=e)
exit_code = None
# Retry if status is unknown
if exit_code is None:
raise RuntimeError('unknown exit code')
if exit_code:
raise RuntimeError('Non-zero exit code')