in wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java [96:183]
private WayangPlan buildPlan(WayangPlanProto plan){
System.out.println(plan);
PlanProto planProto = plan.getPlan();
LinkedList<OperatorProto> protoList = new LinkedList<>();
planProto.getSourcesList().forEach(protoList::addLast);
Map<String, OperatorBase> operators = new HashMap<>();
List<OperatorBase> sinks = new ArrayList<>();
while(! protoList.isEmpty()) {
OperatorProto proto = protoList.pollFirst();
/* Checking if protoOperator can be connected to the current WayangPlan*/
boolean processIt;
if(proto.getType().equals("source")) processIt = true;
else {
/* Checking if ALL predecessors were already processed */
processIt = true;
for(String predecessor : proto.getPredecessorsList()){
if (!operators.containsKey(predecessor)) {
processIt = false;
break;
}
}
}
/* Operators should not be processed twice*/
if(operators.containsKey(proto.getId())) processIt = false;
if(processIt) {
/* Create and store Wayang operator */
OperatorBase operator = createOperatorByType(proto);
operators.put(proto.getId(), operator);
/*TODO Connect with predecessors requires more details in connection slot*/
int order = 0;
for (String pre_id : proto.getPredecessorsList()) {
OperatorBase predecessor = operators.get(pre_id);
/* Only works without replicate topology */
predecessor.connectTo(0, operator, order);
order++;
if(proto.getType().toLowerCase().contains("sink")){
sinks.add(operator);
//if(!sinks.contains(operator)) {
// sinks.add(operator);
//}
}
}
/*List of OperatorProto successors
* They will be added to the protoList
* nevertheless they must be processed only if the parents are in operators list */
List<OperatorProto> listSuccessors = planProto.getOperatorsList()
.stream()
.filter(t -> proto.getSuccessorsList().contains(t.getId()))
.collect(Collectors.toList());
for (OperatorProto successor : listSuccessors){
if(!protoList.contains(successor)){
protoList.addLast(successor);
}
}
List<OperatorProto> sinkSuccessors = planProto.getSinksList()
.stream()
.filter(t -> proto.getSuccessorsList().contains(t.getId()))
.collect(Collectors.toList());
for (OperatorProto successor : sinkSuccessors){
if(!protoList.contains(successor)){
protoList.addLast(successor);
}
}
} else {
/* In case we cannot process it yet, It must be added again at the end*/
protoList.addLast(proto);
}
}
WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
return wayangPlan;
}