def _get_items_to_clone()

in tensorflow_transform/beam/deep_copy.py [0:0]


def _get_items_to_clone(pcollection):
  """Get dependency-sorted list of PCollections and PTransforms to clone.

  This method returns a list of items, each of which is either a PCollection or
  PTransform, that need to be cloned when creating a deep copy. This list is
  sorted in dependency order, i.e. each PCollection or PTransform in the list
  occurs before any of its downstream consumers.

  Args:
    pcollection: PCollection to be deep-copied.

  Returns:
    A dependency-sorted list of PCollections and PTransforms to clone.

  Raises:
    ValueError: if the input PCollection is invalid.
  """
  assert isinstance(pcollection, PCollection)
  # List of items (either PCollection or PTransform, in reverse dependency
  # order (i.e. here, consumers occur before producers).
  reversed_to_clone = []
  # Queue of PCollections to be processed in traversal of pipeline graph.
  to_process = queue.Queue()
  # Set of items (PCollections and PTransforms) already seen during pipeline
  # graph traversal.
  seen = set()

  to_process.put(pcollection)
  seen.add(pcollection)
  while not to_process.empty():
    current_pcollection = to_process.get()

    # Stop if we have reached the beginning of the pipeline, or at a
    # materialization boundary.
    if (isinstance(current_pcollection, pvalue.PBegin) or
        _is_at_materialization_boundary(current_pcollection)):
      continue

    reversed_to_clone.append(current_pcollection)
    applied_transform = current_pcollection.producer
    if applied_transform is None:
      raise ValueError(
          'PCollection node has invalid producer: %s' % current_pcollection)

    # Visit the input PCollection(s), and also add other outputs of that applied
    # PTransform.
    if applied_transform in seen:
      continue
    for output in applied_transform.outputs.values():
      assert isinstance(output, PCollection), output
      if output not in seen:
        reversed_to_clone.append(output)
    seen.add(applied_transform)
    reversed_to_clone.append(applied_transform)
    for input_pcollection in applied_transform.inputs:
      if input_pcollection not in seen:
        to_process.put(input_pcollection)
        seen.add(input_pcollection)

  return list(reversed(reversed_to_clone))