private WayangPlan buildPlan()

in wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java [131:218]


    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }