public void populateDAG()

in samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTask.java [67:118]


  public void populateDAG(DAG dag, Configuration conf) {

    LogicalPlan dag2 = new LogicalPlan();
    for (OperatorMeta o : this.dag.getAllOperators()) {
      dag2.addOperator(o.getName(), o.getOperator());
    }
    for (StreamMeta s : this.dag.getAllStreams()) {
      for (InputPortMeta i : s.getSinks()) {
        Operator.OutputPort<Object> op = (OutputPort<Object>) s.getSource().getPortObject();
        Operator.InputPort<Object> ip = (InputPort<Object>) i.getPortObject();
        dag2.addStream(s.getName(), op, ip);
      }
    }

    detectLoops(dag2, conf);

    // Reconstruct Dag
    for (OperatorMeta o : this.dag.getAllOperators()) {
      dag.addOperator(o.getName(), o.getOperator());
      for (Entry<Attribute<?>, Object> attr : o.getAttributes().entrySet()) {
        dag.setAttribute(o.getOperator(), (Attribute) attr.getKey(), attr.getValue());
      }
      for(InputPortMeta meta: o.getInputStreams().keySet()) {
        AttributeMap map = meta.getAttributes();
        for(Entry<Attribute<?>, Object> entry: map.entrySet()) {
          dag.setInputPortAttribute(meta.getPortObject(), (Attribute) entry.getKey(), entry.getValue());
        }
      }
      dag.setAttribute(Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 300);
    }
    for (StreamMeta s : this.dag.getAllStreams()) {
      if (loopStreams.contains(s)) {
        // Add delay Operator
        DelayOperatorSerializable<ContentEvent> d = dag.addOperator("Delay_" + s.getName(),
            new DelayOperatorSerializable<ContentEvent>());
        dag.addStream("Delay" + s.getName() + "toDelay",
            (DefaultOutputPort<ContentEvent>) s.getSource().getPortObject(), d.input);
        dag.addStream("Delay" + s.getName() + "fromDelay", d.output,
            (DefaultInputPort<ContentEvent>) s.getSinks().get(0).getPortObject());
        dag.setInputPortAttribute(d.input, Context.PortContext.STREAM_CODEC, new JavaSerializationStreamCodec<ContentEvent>());
        dag.setInputPortAttribute(s.getSinks().get(0).getPortObject(), Context.PortContext.STREAM_CODEC, new JavaSerializationStreamCodec<ContentEvent>());
        continue;
      }
      for (InputPortMeta i : s.getSinks()) {
        DefaultOutputPort<Object> op = (DefaultOutputPort<Object>) s.getSource().getPortObject();
        DefaultInputPort<Object> ip = (DefaultInputPort<Object>) i.getPortObject();
        Preconditions.checkArgument(op != null && ip != null);
        dag.addStream(s.getName(), op, ip);
      }
    }
    dag.setAttribute(Context.DAGContext.APPLICATION_NAME, appName);
  }