def _dump_ui_metadata()

in tfx/orchestration/kubeflow/container_entrypoint.py [0:0]


def _dump_ui_metadata(
    node: pipeline_pb2.PipelineNode,
    execution_info: data_types.ExecutionInfo,
    ui_metadata_path: str = '/mlpipeline-ui-metadata.json') -> None:
  """Dump KFP UI metadata json file for visualization purpose.

  For general components we just render a simple Markdown file for
    exec_properties/inputs/outputs.

  Args:
    node: associated TFX node.
    execution_info: runtime execution info for this component, including
      materialized inputs/outputs/execution properties and id.
    ui_metadata_path: path to dump ui metadata.
  """
  exec_properties_list = [
      '**{}**: {}'.format(
          _sanitize_underscore(name), _sanitize_underscore(exec_property))
      for name, exec_property in execution_info.exec_properties.items()
  ]
  src_str_exec_properties = '# Execution properties:\n{}'.format(
      '\n\n'.join(exec_properties_list) or 'No execution property.')

  def _dump_input_populated_artifacts(
      node_inputs: MutableMapping[str, pipeline_pb2.InputSpec],
      name_to_artifacts: Dict[str, List[artifact.Artifact]]) -> List[str]:
    """Dump artifacts markdown string for inputs.

    Args:
      node_inputs: maps from input name to input sepc proto.
      name_to_artifacts: maps from input key to list of populated artifacts.

    Returns:
      A list of dumped markdown string, each of which represents a channel.
    """
    rendered_list = []
    for name, spec in node_inputs.items():
      # Need to look for materialized artifacts in the execution decision.
      rendered_artifacts = ''.join([
          _render_artifact_as_mdstr(single_artifact)
          for single_artifact in name_to_artifacts.get(name, [])
      ])
      # There must be at least a channel in a input, and all channels in a input
      # share the same artifact type.
      artifact_type = spec.channels[0].artifact_query.type.name
      rendered_list.append(
          '## {name}\n\n**Type**: {channel_type}\n\n{artifacts}'.format(
              name=_sanitize_underscore(name),
              channel_type=_sanitize_underscore(artifact_type),
              artifacts=rendered_artifacts))

    return rendered_list

  def _dump_output_populated_artifacts(
      node_outputs: MutableMapping[str, pipeline_pb2.OutputSpec],
      name_to_artifacts: Dict[str, List[artifact.Artifact]]) -> List[str]:
    """Dump artifacts markdown string for outputs.

    Args:
      node_outputs: maps from output name to output sepc proto.
      name_to_artifacts: maps from output key to list of populated artifacts.

    Returns:
      A list of dumped markdown string, each of which represents a channel.
    """
    rendered_list = []
    for name, spec in node_outputs.items():
      # Need to look for materialized artifacts in the execution decision.
      rendered_artifacts = ''.join([
          _render_artifact_as_mdstr(single_artifact)
          for single_artifact in name_to_artifacts.get(name, [])
      ])
      # There must be at least a channel in a input, and all channels in a input
      # share the same artifact type.
      artifact_type = spec.artifact_spec.type.name
      rendered_list.append(
          '## {name}\n\n**Type**: {channel_type}\n\n{artifacts}'.format(
              name=_sanitize_underscore(name),
              channel_type=_sanitize_underscore(artifact_type),
              artifacts=rendered_artifacts))

    return rendered_list

  src_str_inputs = '# Inputs:\n{}'.format(''.join(
      _dump_input_populated_artifacts(
          node_inputs=node.inputs.inputs,
          name_to_artifacts=execution_info.input_dict or {})) or 'No input.')

  src_str_outputs = '# Outputs:\n{}'.format(''.join(
      _dump_output_populated_artifacts(
          node_outputs=node.outputs.outputs,
          name_to_artifacts=execution_info.output_dict or {})) or 'No output.')

  outputs = [{
      'storage':
          'inline',
      'source':
          '{exec_properties}\n\n{inputs}\n\n{outputs}'.format(
              exec_properties=src_str_exec_properties,
              inputs=src_str_inputs,
              outputs=src_str_outputs),
      'type':
          'markdown',
  }]
  # Add Tensorboard view for ModelRun outpus.
  for name, spec in node.outputs.outputs.items():
    if spec.artifact_spec.type.name == standard_artifacts.ModelRun.TYPE_NAME:
      output_model = execution_info.output_dict[name][0]

      # Add Tensorboard view.
      tensorboard_output = {'type': 'tensorboard', 'source': output_model.uri}
      outputs.append(tensorboard_output)

  metadata_dict = {'outputs': outputs}

  with open(ui_metadata_path, 'w') as f:
    json.dump(metadata_dict, f)