def process_pipeline()

in python/old_code/protobuf/planwriter.py [0:0]


    def process_pipeline(self, stage):

        nested_udf = None
        nested_id = ""
        nested_predecessors = None
        nested_successors = None
        for node in reversed(stage):
            logging.debug(node.operator_type + " executable: " + str(node.python_exec) + " id: " + str(node.id))

            if not node.python_exec:
                if nested_udf is not None:

                    # Predecessors depends on last operator
                    # Successors depends on first operator
                    op = self.add_operator(nested_id, "map_partition", nested_udf)

                    ids = str(nested_id).split(",")
                    for id in ids:
                        self.operator_references[str(id)] = op

                    self.boundaries[str(nested_id)] = {}
                    self.boundaries[str(nested_id)]["end"] = nested_successors
                    self.boundaries[str(nested_id)]["start"] = nested_predecessors

                    nested_udf = None
                    nested_id = ""
                    nested_predecessors = None
                    nested_successors = None

                if node.operator.source:
                    op = self.add_source(node.id, node.operator_type, node.operator.udf)
                    self.operator_references[str(node.id)] = op
                    self.boundaries[str(node.id)] = {}
                    self.boundaries[str(node.id)]["end"] = node.successors.keys()

                elif node.operator.sink:
                    op = self.add_sink(node.id, node.operator_type, node.operator.udf)
                    self.operator_references[str(node.id)] = op
                    self.boundaries[str(node.id)] = {}
                    self.boundaries[str(node.id)]["start"] = node.predecessors.keys()

                # Regular operator to be processed in Java
                # Notice that those could include more parameters for Java
                else:
                    op = self.add_java_operator(node.id, node.operator_type, node.operator.udf, node.operator.parameters)
                    self.operator_references[str(node.id)] = op
                    self.boundaries[str(node.id)] = {}
                    self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
                    self.boundaries[str(node.id)]["end"] = node.successors.keys()

            else:

                if nested_udf is None:
                    nested_udf = node.operator.udf
                    nested_id = node.id
                    # It is the last operator to execute in the map partition
                    nested_successors = node.successors.keys()

                else:
                    nested_udf = self.concatenate(nested_udf, node.operator.udf)
                    nested_id = str(node.id) + "," + str(nested_id)

                # Every iteration assign the first known predecessors
                nested_predecessors = node.predecessors.keys()

        # Just in case in the future some pipelines start with Python operators
        if nested_udf is not None:
            self.add_operator(nested_id, "map_partition", nested_udf)

            ids = nested_id.split(",")
            for id in ids:
                self.operator_references[id] = op

            self.boundaries[nested_id] = {}
            self.boundaries[nested_id]["end"] = nested_successors
            self.boundaries[nested_id]["start"] = nested_predecessors