in tensorflow_transform/beam/analysis_graph_builder.py [0:0]
def _visit_partitionable_operation(self, operation_def, upstream_views):
# This is a hint for whether or not the `fine_grained_view` should be used
# downstream. It should be set to true if either the upstream view has
# cacheing operations that haven't been flattened yet, or the current
# operation is cacheable.
all_fine_grained_views_available = all(
v.fine_grained_view for v in upstream_views)
prefer_fine_grained_view = (
any(v.prefer_fine_grained_view for v in upstream_views) or
all_fine_grained_views_available and
operation_def.cache_coder is not None)
next_hashed_path = self._make_next_hashed_path(
[v.hashed_path for v in upstream_views], operation_def)
if all_fine_grained_views_available:
fine_grained_views = (self._apply_operation_on_fine_grained_view(
operation_def, tuple(v.fine_grained_view for v in upstream_views),
next_hashed_path),)
else:
fine_grained_views = (None,) * operation_def.num_outputs
flattened_views = nodes.OperationNode(
operation_def, tuple(v.flattened_view for v in upstream_views)).outputs
assert len(fine_grained_views) == len(flattened_views)
return tuple(
_OptimizationView( # pylint: disable=g-complex-comprehension
prefer_fine_grained_view=prefer_fine_grained_view,
flattened_view=flat,
fine_grained_view=fine,
hashed_path=next_hashed_path)
for flat, fine in zip(flattened_views, fine_grained_views))