def stream_logs()

in xlml/utils/gke.py [0:0]


  def stream_logs(name: str):
    def _watch_pod(name, namespace) -> Optional[int]:
      logs_watcher = kubernetes.watch.Watch()

      logging.info(f'Waiting for pod {name} to start...')
      pod_watcher = kubernetes.watch.Watch()
      for event in pod_watcher.stream(
          core_api.list_namespaced_pod,
          namespace,
          field_selector=f'metadata.name={name}',
      ):
        status = event['object'].status
        logging.info(
            f'Pod {event["object"].metadata.name} status: {status.phase}'
        )
        if status.phase != 'Pending':
          break

      logging.info(f'Streaming pod logs for {name}...')
      for line in logs_watcher.stream(
          core_api.read_namespaced_pod_log,
          name,
          namespace,
          _request_timeout=3600,
      ):
        logging.info(f'{name}] {line}')

      logging.warning(f'Lost logs stream for {name}.')

      pod = core_api.read_namespaced_pod(namespace='default', name=name)
      if pod.status.container_statuses:
        container_status = pod.status.container_statuses[0]
        if pod.status.container_statuses[0].state.terminated:
          exit_code = container_status.state.terminated.exit_code
          if exit_code:
            logging.error(f'Pod {name} had non-zero exit code {exit_code}')

          return exit_code

      logging.warning(f'Unknown status for pod {name}')
      return None

    # We need to re-authenticate if the stream_logs fail. This can happen when
    # the job runs for too long and the credential expire.
    client = get_authenticated_client(gcp.project_name, gcp.zone, cluster_name)

    batch_api = kubernetes.client.BatchV1Api(client)
    core_api = kubernetes.client.CoreV1Api(client)
    pod_label_selector = f'batch.kubernetes.io/job-name={name}'
    pods = core_api.list_namespaced_pod(
        namespace='default', label_selector=pod_label_selector
    )
    # TODO(piz): Use time.sleep may not be a good solution here. However, I expect
    # resources are all ready in wait_all_pods_ready stage. This just in case
    # authentication takes time. Check with Will for better solutions.
    time.sleep(30)
    if len(pods.items) != body['spec']['parallelism']:
      logging.info('Waiting for all pods to be re-connected...')
      raise PodsNotReadyError('pods are not ready after refreshing credential.')

    with concurrent.futures.ThreadPoolExecutor() as executor:
      futures = []
      for pod in pods.items:
        f = executor.submit(
            _watch_pod, pod.metadata.name, pod.metadata.namespace
        )
        futures.append(f)

      # Wait for pods to complete, and exit with the first non-zero exit code.
      for f in concurrent.futures.as_completed(futures):
        try:
          # TODO(piz/wcromar): it looks like there is a delay between as_completed
          # and update of f.result(). exit_code can be None even task is complete.
          exit_code = f.result()
        except kubernetes.client.ApiException as e:
          logging.error('Kubernetes error. Retrying...', exc_info=e)
          exit_code = None

        # Retry if status is unknown
        if exit_code is None:
          raise RuntimeError('unknown exit code')
        if exit_code:
          raise RuntimeError('Non-zero exit code')