def build()

in tensorflow_transform/beam/analysis_graph_builder.py [0:0]


def build(graph,
          input_signature,
          output_signature,
          dataset_keys=None,
          cache_dict=None):
  """Returns a list of `Phase`s describing how to execute the pipeline.

  The default graph is assumed to contain some `Analyzer`s which must be
  executed by doing a full pass over the dataset, and passing the inputs for
  that analyzer into some implementation, then taking the results and replacing
  the `Analyzer`s outputs with constants in the graph containing these results.

  The execution plan is described by a list of `Phase`s.  Each phase contains
  a list of `Analyzer`s, which are the `Analyzer`s which are ready to run in
  that phase, together with a list of ops, which are the table initializers that
  are ready to run in that phase.

  An `Analyzer` or op is ready to run when all its dependencies in the graph
  have been computed.  Thus if the graph is constructed by

  def preprocessing_fn(input)
    x = inputs['x']
    scaled_0 = x - tft.min(x)
    scaled_0_1 = scaled_0 / tft.max(scaled_0)

  Then the first phase will contain the analyzer corresponding to the call to
  `min`, because `x` is an input and so is ready to compute in the first phase,
  while the second phase will contain the analyzer corresponding to the call to
  `max` since `scaled_1` depends on the result of the call to `tft.min` which
  is computed in the first phase.

  More generally, we define a level for each op and each `Analyzer` by walking
  the graph, assigning to each operation the max level of its inputs, to each
  `Tensor` the level of its operation, unless it's the output of an `Analyzer`
  in which case we assign the level of its `Analyzer` plus one.

  Args:
    graph: A `tf.Graph`.
    input_signature: A dict whose keys are strings and values are `Tensor`s or
      `SparseTensor`s.
    output_signature: A dict whose keys are strings and values are `Tensor`s or
      `SparseTensor`s.
    dataset_keys: (Optional) A set of strings which are dataset keys, they
      uniquely identify these datasets across analysis runs.
    cache_dict: (Optional): A cache dictionary.

  Returns:
    A pair of:
      * list of `Phase`s
      * A dictionary of output cache `ValueNode`s.

  Raises:
    ValueError: if the graph cannot be analyzed.
  """
  tensor_sinks = graph.get_collection(analyzer_nodes.TENSOR_REPLACEMENTS)
  graph.clear_collection(analyzer_nodes.TENSOR_REPLACEMENTS)
  phase = 0
  tensor_bindings = []
  sink_tensors_ready = {
      tf_utils.hashable_tensor_or_op(tensor_sink.tensor):
          False for tensor_sink in tensor_sinks
  }
  translate_visitor = _TranslateVisitor()
  translate_traverser = nodes.Traverser(translate_visitor)

  analyzers_input_signature = {}
  graph_analyzer = None

  extracted_input_node = nodes.apply_operation(
      beam_nodes.ExtractInputForSavedModel,
      dataset_key=analyzer_cache._make_flattened_dataset_key(),  # pylint: disable=protected-access
      label='ExtractInputForSavedModel[FlattenedDataset]')

  while not all(sink_tensors_ready.values()):
    infix = 'Phase{}'.format(phase)
    # Determine which table init ops are ready to run in this phase
    # Determine which keys of pending_tensor_replacements are ready to run
    # in this phase, based in whether their dependencies are ready.
    graph_analyzer = graph_tools.InitializableGraphAnalyzer(
        graph, input_signature, list(sink_tensors_ready.items()),
        graph_tools.describe_path_as_analyzer_cache_hash)
    ready_traverser = nodes.Traverser(_ReadyVisitor(graph_analyzer))

    # Now create and apply a SavedModel with all tensors in tensor_bindings
    # bound, which outputs all the tensors in the required tensor tuples.
    intermediate_output_signature = collections.OrderedDict()
    saved_model_future = nodes.apply_operation(
        beam_nodes.CreateSavedModel,
        *tensor_bindings,
        table_initializers=tuple(graph_analyzer.ready_table_initializers),
        output_signature=intermediate_output_signature,
        label='CreateSavedModelForAnalyzerInputs[{}]'.format(infix))

    extracted_values_dict = nodes.apply_operation(
        beam_nodes.ApplySavedModel,
        saved_model_future,
        extracted_input_node,
        phase=phase,
        label='ApplySavedModel[{}]'.format(infix))

    translate_visitor.phase = phase
    translate_visitor.intermediate_output_signature = (
        intermediate_output_signature)
    translate_visitor.extracted_values_dict = extracted_values_dict
    for tensor, value_node, is_asset_filepath in tensor_sinks:
      hashable_tensor = tf_utils.hashable_tensor_or_op(tensor)
      # Don't compute a binding/sink/replacement that's already been computed
      if sink_tensors_ready[hashable_tensor]:
        continue

      if not ready_traverser.visit_value_node(value_node):
        continue

      translated_value_node = translate_traverser.visit_value_node(value_node)

      name = _tensor_name(tensor)
      tensor_bindings.append(
          nodes.apply_operation(
              beam_nodes.CreateTensorBinding,
              translated_value_node,
              tensor_name=str(tensor.name),
              dtype_enum=tensor.dtype.as_datatype_enum,
              is_asset_filepath=is_asset_filepath,
              label=analyzer_nodes.sanitize_label(
                  'CreateTensorBinding[{}]'.format(name))))
      sink_tensors_ready[hashable_tensor] = True

    analyzers_input_signature.update(intermediate_output_signature)
    phase += 1

  # We need to make sure that the representation of this output_signature is
  # deterministic.
  output_signature = collections.OrderedDict(
      sorted(output_signature.items(), key=lambda t: t[0]))

  # TODO(KesterTong): check all table initializers are ready, check all output
  # tensors are ready.
  saved_model_future = nodes.apply_operation(
      beam_nodes.CreateSavedModel,
      *tensor_bindings,
      table_initializers=tuple(
          graph.get_collection(tf.compat.v1.GraphKeys.TABLE_INITIALIZERS)),
      output_signature=output_signature,
      label='CreateSavedModel')

  tensor_keys_to_paths = {
      tensor_key:
      graph_analyzer.get_unique_path(analyzers_input_signature[tensor_key])
      for tensor_key in analyzers_input_signature
  }
  (optimized_saved_model_future,
   output_cache_value_nodes) = _perform_cache_optimization(
       saved_model_future, dataset_keys, tensor_keys_to_paths, cache_dict)

  (optimized_saved_model_future, output_cache_value_nodes) = (
      combiner_packing_util.perform_combiner_packing_optimization(
          optimized_saved_model_future, output_cache_value_nodes, phase))

  global _ANALYSIS_GRAPH
  _ANALYSIS_GRAPH = optimized_saved_model_future
  return optimized_saved_model_future, output_cache_value_nodes