in python/old_code/protobuf/planwriter.py [0:0]
def process_pipeline(self, stage):
nested_udf = None
nested_id = ""
nested_predecessors = None
nested_successors = None
for node in reversed(stage):
logging.debug(node.operator_type + " executable: " + str(node.python_exec) + " id: " + str(node.id))
if not node.python_exec:
if nested_udf is not None:
# Predecessors depends on last operator
# Successors depends on first operator
op = self.add_operator(nested_id, "map_partition", nested_udf)
ids = str(nested_id).split(",")
for id in ids:
self.operator_references[str(id)] = op
self.boundaries[str(nested_id)] = {}
self.boundaries[str(nested_id)]["end"] = nested_successors
self.boundaries[str(nested_id)]["start"] = nested_predecessors
nested_udf = None
nested_id = ""
nested_predecessors = None
nested_successors = None
if node.operator.source:
op = self.add_source(node.id, node.operator_type, node.operator.udf)
self.operator_references[str(node.id)] = op
self.boundaries[str(node.id)] = {}
self.boundaries[str(node.id)]["end"] = node.successors.keys()
elif node.operator.sink:
op = self.add_sink(node.id, node.operator_type, node.operator.udf)
self.operator_references[str(node.id)] = op
self.boundaries[str(node.id)] = {}
self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
# Regular operator to be processed in Java
# Notice that those could include more parameters for Java
else:
op = self.add_java_operator(node.id, node.operator_type, node.operator.udf, node.operator.parameters)
self.operator_references[str(node.id)] = op
self.boundaries[str(node.id)] = {}
self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
self.boundaries[str(node.id)]["end"] = node.successors.keys()
else:
if nested_udf is None:
nested_udf = node.operator.udf
nested_id = node.id
# It is the last operator to execute in the map partition
nested_successors = node.successors.keys()
else:
nested_udf = self.concatenate(nested_udf, node.operator.udf)
nested_id = str(node.id) + "," + str(nested_id)
# Every iteration assign the first known predecessors
nested_predecessors = node.predecessors.keys()
# Just in case in the future some pipelines start with Python operators
if nested_udf is not None:
self.add_operator(nested_id, "map_partition", nested_udf)
ids = nested_id.split(",")
for id in ids:
self.operator_references[id] = op
self.boundaries[nested_id] = {}
self.boundaries[nested_id]["end"] = nested_successors
self.boundaries[nested_id]["start"] = nested_predecessors