in samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTask.java [67:118]
public void populateDAG(DAG dag, Configuration conf) {
LogicalPlan dag2 = new LogicalPlan();
for (OperatorMeta o : this.dag.getAllOperators()) {
dag2.addOperator(o.getName(), o.getOperator());
}
for (StreamMeta s : this.dag.getAllStreams()) {
for (InputPortMeta i : s.getSinks()) {
Operator.OutputPort<Object> op = (OutputPort<Object>) s.getSource().getPortObject();
Operator.InputPort<Object> ip = (InputPort<Object>) i.getPortObject();
dag2.addStream(s.getName(), op, ip);
}
}
detectLoops(dag2, conf);
// Reconstruct Dag
for (OperatorMeta o : this.dag.getAllOperators()) {
dag.addOperator(o.getName(), o.getOperator());
for (Entry<Attribute<?>, Object> attr : o.getAttributes().entrySet()) {
dag.setAttribute(o.getOperator(), (Attribute) attr.getKey(), attr.getValue());
}
for(InputPortMeta meta: o.getInputStreams().keySet()) {
AttributeMap map = meta.getAttributes();
for(Entry<Attribute<?>, Object> entry: map.entrySet()) {
dag.setInputPortAttribute(meta.getPortObject(), (Attribute) entry.getKey(), entry.getValue());
}
}
dag.setAttribute(Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 300);
}
for (StreamMeta s : this.dag.getAllStreams()) {
if (loopStreams.contains(s)) {
// Add delay Operator
DelayOperatorSerializable<ContentEvent> d = dag.addOperator("Delay_" + s.getName(),
new DelayOperatorSerializable<ContentEvent>());
dag.addStream("Delay" + s.getName() + "toDelay",
(DefaultOutputPort<ContentEvent>) s.getSource().getPortObject(), d.input);
dag.addStream("Delay" + s.getName() + "fromDelay", d.output,
(DefaultInputPort<ContentEvent>) s.getSinks().get(0).getPortObject());
dag.setInputPortAttribute(d.input, Context.PortContext.STREAM_CODEC, new JavaSerializationStreamCodec<ContentEvent>());
dag.setInputPortAttribute(s.getSinks().get(0).getPortObject(), Context.PortContext.STREAM_CODEC, new JavaSerializationStreamCodec<ContentEvent>());
continue;
}
for (InputPortMeta i : s.getSinks()) {
DefaultOutputPort<Object> op = (DefaultOutputPort<Object>) s.getSource().getPortObject();
DefaultInputPort<Object> ip = (DefaultInputPort<Object>) i.getPortObject();
Preconditions.checkArgument(op != null && ip != null);
dag.addStream(s.getName(), op, ip);
}
}
dag.setAttribute(Context.DAGContext.APPLICATION_NAME, appName);
}