in tfx/orchestration/kubeflow/container_entrypoint.py [0:0]
def main(argv):
# Log to the container's stdout so Kubeflow Pipelines UI can display logs to
# the user.
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--pipeline_root', type=str, required=True)
parser.add_argument(
'--metadata_ui_path',
type=str,
required=False,
default='/mlpipeline-ui-metadata.json')
parser.add_argument('--kubeflow_metadata_config', type=str, required=True)
parser.add_argument('--tfx_ir', type=str, required=True)
parser.add_argument('--node_id', type=str, required=True)
# There might be multiple runtime parameters.
# `args.runtime_parameter` should become List[str] by using "append".
parser.add_argument('--runtime_parameter', type=str, action='append')
# TODO(b/196892362): Replace hooking with a more straightforward mechanism.
launcher._register_execution = _register_execution # pylint: disable=protected-access
args = parser.parse_args(argv)
tfx_ir = pipeline_pb2.Pipeline()
json_format.Parse(args.tfx_ir, tfx_ir)
_resolve_runtime_parameters(tfx_ir, args.runtime_parameter)
deployment_config = runner_utils.extract_local_deployment_config(tfx_ir)
kubeflow_metadata_config = kubeflow_pb2.KubeflowMetadataConfig()
json_format.Parse(args.kubeflow_metadata_config, kubeflow_metadata_config)
metadata_connection = metadata.Metadata(
_get_metadata_connection_config(kubeflow_metadata_config))
node_id = args.node_id
# Attach necessary labels to distinguish different runner and DSL.
# TODO(zhitaoli): Pass this from KFP runner side when the same container
# entrypoint can be used by a different runner.
with telemetry_utils.scoped_labels({
telemetry_utils.LABEL_TFX_RUNNER: 'kfp',
}):
custom_executor_operators = {
executable_spec_pb2.ContainerExecutableSpec:
kubernetes_executor_operator.KubernetesExecutorOperator
}
executor_spec = runner_utils.extract_executor_spec(deployment_config,
node_id)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, node_id)
pipeline_node = _get_pipeline_node(tfx_ir, node_id)
component_launcher = launcher.Launcher(
pipeline_node=pipeline_node,
mlmd_connection=metadata_connection,
pipeline_info=tfx_ir.pipeline_info,
pipeline_runtime_spec=tfx_ir.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators)
logging.info('Component %s is running.', node_id)
execution_info = component_launcher.launch()
logging.info('Component %s is finished.', node_id)
# Dump the UI metadata.
_dump_ui_metadata(pipeline_node, execution_info, args.metadata_ui_path)