def _visit_partitionable_operation()

in tensorflow_transform/beam/analysis_graph_builder.py [0:0]


  def _visit_partitionable_operation(self, operation_def, upstream_views):

    # This is a hint for whether or not the `fine_grained_view` should be used
    # downstream.  It should be set to true if either the upstream view has
    # cacheing operations that haven't been flattened yet, or the current
    # operation is cacheable.
    all_fine_grained_views_available = all(
        v.fine_grained_view for v in upstream_views)
    prefer_fine_grained_view = (
        any(v.prefer_fine_grained_view for v in upstream_views) or
        all_fine_grained_views_available and
        operation_def.cache_coder is not None)

    next_hashed_path = self._make_next_hashed_path(
        [v.hashed_path for v in upstream_views], operation_def)
    if all_fine_grained_views_available:
      fine_grained_views = (self._apply_operation_on_fine_grained_view(
          operation_def, tuple(v.fine_grained_view for v in upstream_views),
          next_hashed_path),)
    else:
      fine_grained_views = (None,) * operation_def.num_outputs

    flattened_views = nodes.OperationNode(
        operation_def, tuple(v.flattened_view for v in upstream_views)).outputs

    assert len(fine_grained_views) == len(flattened_views)
    return tuple(
        _OptimizationView(  # pylint: disable=g-complex-comprehension
            prefer_fine_grained_view=prefer_fine_grained_view,
            flattened_view=flat,
            fine_grained_view=fine,
            hashed_path=next_hashed_path)
        for flat, fine in zip(flattened_views, fine_grained_views))