in tfx/orchestration/kubeflow/container_entrypoint.py [0:0]
def _dump_ui_metadata(
node: pipeline_pb2.PipelineNode,
execution_info: data_types.ExecutionInfo,
ui_metadata_path: str = '/mlpipeline-ui-metadata.json') -> None:
"""Dump KFP UI metadata json file for visualization purpose.
For general components we just render a simple Markdown file for
exec_properties/inputs/outputs.
Args:
node: associated TFX node.
execution_info: runtime execution info for this component, including
materialized inputs/outputs/execution properties and id.
ui_metadata_path: path to dump ui metadata.
"""
exec_properties_list = [
'**{}**: {}'.format(
_sanitize_underscore(name), _sanitize_underscore(exec_property))
for name, exec_property in execution_info.exec_properties.items()
]
src_str_exec_properties = '# Execution properties:\n{}'.format(
'\n\n'.join(exec_properties_list) or 'No execution property.')
def _dump_input_populated_artifacts(
node_inputs: MutableMapping[str, pipeline_pb2.InputSpec],
name_to_artifacts: Dict[str, List[artifact.Artifact]]) -> List[str]:
"""Dump artifacts markdown string for inputs.
Args:
node_inputs: maps from input name to input sepc proto.
name_to_artifacts: maps from input key to list of populated artifacts.
Returns:
A list of dumped markdown string, each of which represents a channel.
"""
rendered_list = []
for name, spec in node_inputs.items():
# Need to look for materialized artifacts in the execution decision.
rendered_artifacts = ''.join([
_render_artifact_as_mdstr(single_artifact)
for single_artifact in name_to_artifacts.get(name, [])
])
# There must be at least a channel in a input, and all channels in a input
# share the same artifact type.
artifact_type = spec.channels[0].artifact_query.type.name
rendered_list.append(
'## {name}\n\n**Type**: {channel_type}\n\n{artifacts}'.format(
name=_sanitize_underscore(name),
channel_type=_sanitize_underscore(artifact_type),
artifacts=rendered_artifacts))
return rendered_list
def _dump_output_populated_artifacts(
node_outputs: MutableMapping[str, pipeline_pb2.OutputSpec],
name_to_artifacts: Dict[str, List[artifact.Artifact]]) -> List[str]:
"""Dump artifacts markdown string for outputs.
Args:
node_outputs: maps from output name to output sepc proto.
name_to_artifacts: maps from output key to list of populated artifacts.
Returns:
A list of dumped markdown string, each of which represents a channel.
"""
rendered_list = []
for name, spec in node_outputs.items():
# Need to look for materialized artifacts in the execution decision.
rendered_artifacts = ''.join([
_render_artifact_as_mdstr(single_artifact)
for single_artifact in name_to_artifacts.get(name, [])
])
# There must be at least a channel in a input, and all channels in a input
# share the same artifact type.
artifact_type = spec.artifact_spec.type.name
rendered_list.append(
'## {name}\n\n**Type**: {channel_type}\n\n{artifacts}'.format(
name=_sanitize_underscore(name),
channel_type=_sanitize_underscore(artifact_type),
artifacts=rendered_artifacts))
return rendered_list
src_str_inputs = '# Inputs:\n{}'.format(''.join(
_dump_input_populated_artifacts(
node_inputs=node.inputs.inputs,
name_to_artifacts=execution_info.input_dict or {})) or 'No input.')
src_str_outputs = '# Outputs:\n{}'.format(''.join(
_dump_output_populated_artifacts(
node_outputs=node.outputs.outputs,
name_to_artifacts=execution_info.output_dict or {})) or 'No output.')
outputs = [{
'storage':
'inline',
'source':
'{exec_properties}\n\n{inputs}\n\n{outputs}'.format(
exec_properties=src_str_exec_properties,
inputs=src_str_inputs,
outputs=src_str_outputs),
'type':
'markdown',
}]
# Add Tensorboard view for ModelRun outpus.
for name, spec in node.outputs.outputs.items():
if spec.artifact_spec.type.name == standard_artifacts.ModelRun.TYPE_NAME:
output_model = execution_info.output_dict[name][0]
# Add Tensorboard view.
tensorboard_output = {'type': 'tensorboard', 'source': output_model.uri}
outputs.append(tensorboard_output)
metadata_dict = {'outputs': outputs}
with open(ui_metadata_path, 'w') as f:
json.dump(metadata_dict, f)