tfx/orchestration/launcher/kubernetes_component_launcher.py [88:161]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    try:
      namespace = kube_utils.get_kfp_namespace()
    except RuntimeError:
      namespace = 'kubeflow'

    pod_manifest = self._build_pod_manifest(pod_name, container_spec)
    core_api = kube_utils.make_core_v1_api()

    if kube_utils.is_inside_kfp():
      launcher_pod = kube_utils.get_current_kfp_pod(core_api)
      pod_manifest['spec']['serviceAccount'] = launcher_pod.spec.service_account
      pod_manifest['spec'][
          'serviceAccountName'] = launcher_pod.spec.service_account_name
      pod_manifest['metadata'][
          'ownerReferences'] = container_common.to_swagger_dict(
              launcher_pod.metadata.owner_references)
    else:
      pod_manifest['spec']['serviceAccount'] = kube_utils.TFX_SERVICE_ACCOUNT
      pod_manifest['spec'][
          'serviceAccountName'] = kube_utils.TFX_SERVICE_ACCOUNT

    logging.info('Looking for pod "%s:%s".', namespace, pod_name)
    resp = kube_utils.get_pod(core_api, pod_name, namespace)
    if not resp:
      logging.info('Pod "%s:%s" does not exist. Creating it...',
                   namespace, pod_name)
      logging.info('Pod manifest: %s', pod_manifest)
      try:
        resp = core_api.create_namespaced_pod(
            namespace=namespace, body=pod_manifest)
      except client.rest.ApiException as e:
        raise RuntimeError(
            'Failed to created container executor pod!\nReason: %s\nBody: %s' %
            (e.reason, e.body))

    # Wait up to 300 seconds for the pod to move from pending to another status.
    logging.info('Waiting for pod "%s:%s" to start.', namespace, pod_name)
    kube_utils.wait_pod(
        core_api,
        pod_name,
        namespace,
        exit_condition_lambda=kube_utils.pod_is_not_pending,
        condition_description='non-pending status',
        timeout_sec=300)

    logging.info('Start log streaming for pod "%s:%s".', namespace, pod_name)
    try:
      logs = core_api.read_namespaced_pod_log(
          name=pod_name,
          namespace=namespace,
          container=kube_utils.ARGO_MAIN_CONTAINER_NAME,
          follow=True,
          _preload_content=False).stream()
    except client.rest.ApiException as e:
      raise RuntimeError(
          'Failed to stream the logs from the pod!\nReason: %s\nBody: %s' %
          (e.reason, e.body))

    for log in logs:
      logging.info(log.decode().rstrip('\n'))

    # Wait indefinitely for the pod to complete.
    resp = kube_utils.wait_pod(
        core_api,
        pod_name,
        namespace,
        exit_condition_lambda=kube_utils.pod_is_done,
        condition_description='done state')

    if resp.status.phase == kube_utils.PodPhase.FAILED.value:
      raise RuntimeError('Pod "%s:%s" failed with status "%s".' %
                         (namespace, pod_name, resp.status))

    logging.info('Pod "%s:%s" is done.', namespace, pod_name)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tfx/orchestration/portable/kubernetes_executor_operator.py [85:158]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    try:
      namespace = kube_utils.get_kfp_namespace()
    except RuntimeError:
      namespace = 'kubeflow'

    pod_manifest = self._build_pod_manifest(pod_name, container_spec)
    core_api = kube_utils.make_core_v1_api()

    if kube_utils.is_inside_kfp():
      launcher_pod = kube_utils.get_current_kfp_pod(core_api)
      pod_manifest['spec']['serviceAccount'] = launcher_pod.spec.service_account
      pod_manifest['spec'][
          'serviceAccountName'] = launcher_pod.spec.service_account_name
      pod_manifest['metadata'][
          'ownerReferences'] = container_common.to_swagger_dict(
              launcher_pod.metadata.owner_references)
    else:
      pod_manifest['spec']['serviceAccount'] = kube_utils.TFX_SERVICE_ACCOUNT
      pod_manifest['spec'][
          'serviceAccountName'] = kube_utils.TFX_SERVICE_ACCOUNT

    logging.info('Looking for pod "%s:%s".', namespace, pod_name)
    resp = kube_utils.get_pod(core_api, pod_name, namespace)
    if not resp:
      logging.info('Pod "%s:%s" does not exist. Creating it...',
                   namespace, pod_name)
      logging.info('Pod manifest: %s', pod_manifest)
      try:
        resp = core_api.create_namespaced_pod(
            namespace=namespace, body=pod_manifest)
      except client.rest.ApiException as e:
        raise RuntimeError(
            'Failed to created container executor pod!\nReason: %s\nBody: %s' %
            (e.reason, e.body))

    # Wait up to 300 seconds for the pod to move from pending to another status.
    logging.info('Waiting for pod "%s:%s" to start.', namespace, pod_name)
    kube_utils.wait_pod(
        core_api,
        pod_name,
        namespace,
        exit_condition_lambda=kube_utils.pod_is_not_pending,
        condition_description='non-pending status',
        timeout_sec=300)

    logging.info('Start log streaming for pod "%s:%s".', namespace, pod_name)
    try:
      logs = core_api.read_namespaced_pod_log(
          name=pod_name,
          namespace=namespace,
          container=kube_utils.ARGO_MAIN_CONTAINER_NAME,
          follow=True,
          _preload_content=False).stream()
    except client.rest.ApiException as e:
      raise RuntimeError(
          'Failed to stream the logs from the pod!\nReason: %s\nBody: %s' %
          (e.reason, e.body))

    for log in logs:
      logging.info(log.decode().rstrip('\n'))

    # Wait indefinitely for the pod to complete.
    resp = kube_utils.wait_pod(
        core_api,
        pod_name,
        namespace,
        exit_condition_lambda=kube_utils.pod_is_done,
        condition_description='done state')

    if resp.status.phase == kube_utils.PodPhase.FAILED.value:
      raise RuntimeError('Pod "%s:%s" failed with status "%s".' %
                         (namespace, pod_name, resp.status))

    logging.info('Pod "%s:%s" is done.', namespace, pod_name)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



