in tensorflow_transform/beam/analysis_graph_builder.py [0:0]
def build(graph,
input_signature,
output_signature,
dataset_keys=None,
cache_dict=None):
"""Returns a list of `Phase`s describing how to execute the pipeline.
The default graph is assumed to contain some `Analyzer`s which must be
executed by doing a full pass over the dataset, and passing the inputs for
that analyzer into some implementation, then taking the results and replacing
the `Analyzer`s outputs with constants in the graph containing these results.
The execution plan is described by a list of `Phase`s. Each phase contains
a list of `Analyzer`s, which are the `Analyzer`s which are ready to run in
that phase, together with a list of ops, which are the table initializers that
are ready to run in that phase.
An `Analyzer` or op is ready to run when all its dependencies in the graph
have been computed. Thus if the graph is constructed by
def preprocessing_fn(input)
x = inputs['x']
scaled_0 = x - tft.min(x)
scaled_0_1 = scaled_0 / tft.max(scaled_0)
Then the first phase will contain the analyzer corresponding to the call to
`min`, because `x` is an input and so is ready to compute in the first phase,
while the second phase will contain the analyzer corresponding to the call to
`max` since `scaled_1` depends on the result of the call to `tft.min` which
is computed in the first phase.
More generally, we define a level for each op and each `Analyzer` by walking
the graph, assigning to each operation the max level of its inputs, to each
`Tensor` the level of its operation, unless it's the output of an `Analyzer`
in which case we assign the level of its `Analyzer` plus one.
Args:
graph: A `tf.Graph`.
input_signature: A dict whose keys are strings and values are `Tensor`s or
`SparseTensor`s.
output_signature: A dict whose keys are strings and values are `Tensor`s or
`SparseTensor`s.
dataset_keys: (Optional) A set of strings which are dataset keys, they
uniquely identify these datasets across analysis runs.
cache_dict: (Optional): A cache dictionary.
Returns:
A pair of:
* list of `Phase`s
* A dictionary of output cache `ValueNode`s.
Raises:
ValueError: if the graph cannot be analyzed.
"""
tensor_sinks = graph.get_collection(analyzer_nodes.TENSOR_REPLACEMENTS)
graph.clear_collection(analyzer_nodes.TENSOR_REPLACEMENTS)
phase = 0
tensor_bindings = []
sink_tensors_ready = {
tf_utils.hashable_tensor_or_op(tensor_sink.tensor):
False for tensor_sink in tensor_sinks
}
translate_visitor = _TranslateVisitor()
translate_traverser = nodes.Traverser(translate_visitor)
analyzers_input_signature = {}
graph_analyzer = None
extracted_input_node = nodes.apply_operation(
beam_nodes.ExtractInputForSavedModel,
dataset_key=analyzer_cache._make_flattened_dataset_key(), # pylint: disable=protected-access
label='ExtractInputForSavedModel[FlattenedDataset]')
while not all(sink_tensors_ready.values()):
infix = 'Phase{}'.format(phase)
# Determine which table init ops are ready to run in this phase
# Determine which keys of pending_tensor_replacements are ready to run
# in this phase, based in whether their dependencies are ready.
graph_analyzer = graph_tools.InitializableGraphAnalyzer(
graph, input_signature, list(sink_tensors_ready.items()),
graph_tools.describe_path_as_analyzer_cache_hash)
ready_traverser = nodes.Traverser(_ReadyVisitor(graph_analyzer))
# Now create and apply a SavedModel with all tensors in tensor_bindings
# bound, which outputs all the tensors in the required tensor tuples.
intermediate_output_signature = collections.OrderedDict()
saved_model_future = nodes.apply_operation(
beam_nodes.CreateSavedModel,
*tensor_bindings,
table_initializers=tuple(graph_analyzer.ready_table_initializers),
output_signature=intermediate_output_signature,
label='CreateSavedModelForAnalyzerInputs[{}]'.format(infix))
extracted_values_dict = nodes.apply_operation(
beam_nodes.ApplySavedModel,
saved_model_future,
extracted_input_node,
phase=phase,
label='ApplySavedModel[{}]'.format(infix))
translate_visitor.phase = phase
translate_visitor.intermediate_output_signature = (
intermediate_output_signature)
translate_visitor.extracted_values_dict = extracted_values_dict
for tensor, value_node, is_asset_filepath in tensor_sinks:
hashable_tensor = tf_utils.hashable_tensor_or_op(tensor)
# Don't compute a binding/sink/replacement that's already been computed
if sink_tensors_ready[hashable_tensor]:
continue
if not ready_traverser.visit_value_node(value_node):
continue
translated_value_node = translate_traverser.visit_value_node(value_node)
name = _tensor_name(tensor)
tensor_bindings.append(
nodes.apply_operation(
beam_nodes.CreateTensorBinding,
translated_value_node,
tensor_name=str(tensor.name),
dtype_enum=tensor.dtype.as_datatype_enum,
is_asset_filepath=is_asset_filepath,
label=analyzer_nodes.sanitize_label(
'CreateTensorBinding[{}]'.format(name))))
sink_tensors_ready[hashable_tensor] = True
analyzers_input_signature.update(intermediate_output_signature)
phase += 1
# We need to make sure that the representation of this output_signature is
# deterministic.
output_signature = collections.OrderedDict(
sorted(output_signature.items(), key=lambda t: t[0]))
# TODO(KesterTong): check all table initializers are ready, check all output
# tensors are ready.
saved_model_future = nodes.apply_operation(
beam_nodes.CreateSavedModel,
*tensor_bindings,
table_initializers=tuple(
graph.get_collection(tf.compat.v1.GraphKeys.TABLE_INITIALIZERS)),
output_signature=output_signature,
label='CreateSavedModel')
tensor_keys_to_paths = {
tensor_key:
graph_analyzer.get_unique_path(analyzers_input_signature[tensor_key])
for tensor_key in analyzers_input_signature
}
(optimized_saved_model_future,
output_cache_value_nodes) = _perform_cache_optimization(
saved_model_future, dataset_keys, tensor_keys_to_paths, cache_dict)
(optimized_saved_model_future, output_cache_value_nodes) = (
combiner_packing_util.perform_combiner_packing_optimization(
optimized_saved_model_future, output_cache_value_nodes, phase))
global _ANALYSIS_GRAPH
_ANALYSIS_GRAPH = optimized_saved_model_future
return optimized_saved_model_future, output_cache_value_nodes