in tensorflow_transform/beam/deep_copy.py [0:0]
def _get_items_to_clone(pcollection):
"""Get dependency-sorted list of PCollections and PTransforms to clone.
This method returns a list of items, each of which is either a PCollection or
PTransform, that need to be cloned when creating a deep copy. This list is
sorted in dependency order, i.e. each PCollection or PTransform in the list
occurs before any of its downstream consumers.
Args:
pcollection: PCollection to be deep-copied.
Returns:
A dependency-sorted list of PCollections and PTransforms to clone.
Raises:
ValueError: if the input PCollection is invalid.
"""
assert isinstance(pcollection, PCollection)
# List of items (either PCollection or PTransform, in reverse dependency
# order (i.e. here, consumers occur before producers).
reversed_to_clone = []
# Queue of PCollections to be processed in traversal of pipeline graph.
to_process = queue.Queue()
# Set of items (PCollections and PTransforms) already seen during pipeline
# graph traversal.
seen = set()
to_process.put(pcollection)
seen.add(pcollection)
while not to_process.empty():
current_pcollection = to_process.get()
# Stop if we have reached the beginning of the pipeline, or at a
# materialization boundary.
if (isinstance(current_pcollection, pvalue.PBegin) or
_is_at_materialization_boundary(current_pcollection)):
continue
reversed_to_clone.append(current_pcollection)
applied_transform = current_pcollection.producer
if applied_transform is None:
raise ValueError(
'PCollection node has invalid producer: %s' % current_pcollection)
# Visit the input PCollection(s), and also add other outputs of that applied
# PTransform.
if applied_transform in seen:
continue
for output in applied_transform.outputs.values():
assert isinstance(output, PCollection), output
if output not in seen:
reversed_to_clone.append(output)
seen.add(applied_transform)
reversed_to_clone.append(applied_transform)
for input_pcollection in applied_transform.inputs:
if input_pcollection not in seen:
to_process.put(input_pcollection)
seen.add(input_pcollection)
return list(reversed(reversed_to_clone))