def main()

in tfx/orchestration/kubeflow/container_entrypoint.py [0:0]


def main(argv):
  # Log to the container's stdout so Kubeflow Pipelines UI can display logs to
  # the user.
  logging.basicConfig(stream=sys.stdout, level=logging.INFO)
  logging.getLogger().setLevel(logging.INFO)

  parser = argparse.ArgumentParser()
  parser.add_argument('--pipeline_root', type=str, required=True)
  parser.add_argument(
      '--metadata_ui_path',
      type=str,
      required=False,
      default='/mlpipeline-ui-metadata.json')
  parser.add_argument('--kubeflow_metadata_config', type=str, required=True)
  parser.add_argument('--tfx_ir', type=str, required=True)
  parser.add_argument('--node_id', type=str, required=True)
  # There might be multiple runtime parameters.
  # `args.runtime_parameter` should become List[str] by using "append".
  parser.add_argument('--runtime_parameter', type=str, action='append')

  # TODO(b/196892362): Replace hooking with a more straightforward mechanism.
  launcher._register_execution = _register_execution  # pylint: disable=protected-access

  args = parser.parse_args(argv)

  tfx_ir = pipeline_pb2.Pipeline()
  json_format.Parse(args.tfx_ir, tfx_ir)

  _resolve_runtime_parameters(tfx_ir, args.runtime_parameter)

  deployment_config = runner_utils.extract_local_deployment_config(tfx_ir)

  kubeflow_metadata_config = kubeflow_pb2.KubeflowMetadataConfig()
  json_format.Parse(args.kubeflow_metadata_config, kubeflow_metadata_config)
  metadata_connection = metadata.Metadata(
      _get_metadata_connection_config(kubeflow_metadata_config))

  node_id = args.node_id
  # Attach necessary labels to distinguish different runner and DSL.
  # TODO(zhitaoli): Pass this from KFP runner side when the same container
  # entrypoint can be used by a different runner.
  with telemetry_utils.scoped_labels({
      telemetry_utils.LABEL_TFX_RUNNER: 'kfp',
  }):
    custom_executor_operators = {
        executable_spec_pb2.ContainerExecutableSpec:
            kubernetes_executor_operator.KubernetesExecutorOperator
    }

    executor_spec = runner_utils.extract_executor_spec(deployment_config,
                                                       node_id)
    custom_driver_spec = runner_utils.extract_custom_driver_spec(
        deployment_config, node_id)

    pipeline_node = _get_pipeline_node(tfx_ir, node_id)
    component_launcher = launcher.Launcher(
        pipeline_node=pipeline_node,
        mlmd_connection=metadata_connection,
        pipeline_info=tfx_ir.pipeline_info,
        pipeline_runtime_spec=tfx_ir.runtime_spec,
        executor_spec=executor_spec,
        custom_driver_spec=custom_driver_spec,
        custom_executor_operators=custom_executor_operators)
    logging.info('Component %s is running.', node_id)
    execution_info = component_launcher.launch()
    logging.info('Component %s is finished.', node_id)

  # Dump the UI metadata.
  _dump_ui_metadata(pipeline_node, execution_info, args.metadata_ui_path)