private Map getMapForPI()

in samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java [180:246]


  private Map<String, String> getMapForPI(SamzaProcessingItem pi, String filename, String filesystem) throws Exception {
    Map<String, String> map = getBasicSystemConfig();

    // Set job name, task class, task inputs (from SamzaProcessingItem)
    setJobName(map, pi.getName());
    setTaskClass(map, SamzaProcessingItem.class.getName());

    StringBuilder streamNames = new StringBuilder();
    boolean first = true;
    for (SamzaSystemStream stream : pi.getInputStreams()) {
      if (!first)
        streamNames.append(COMMA);
      streamNames.append(stream.getSystem() + DOT + stream.getStream());
      if (first)
        first = false;
    }
    setTaskInputs(map, streamNames.toString());

    // Processor file
    setFileName(map, filename);
    setFileSystem(map, filesystem);

    List<String> nameList = new ArrayList<String>();
    // Default kafka system: kafka0: sync producer
    // This system is always required: it is used for checkpointing
    nameList.add("kafka0");
    setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1);
    // Output streams: set kafka systems
    for (SamzaStream stream : pi.getOutputStreams()) {
      boolean found = false;
      for (String name : nameList) {
        if (stream.getSystemName().equals(name)) {
          found = true;
          break;
        }
      }
      if (!found) {
        nameList.add(stream.getSystemName());
        setKafkaSystem(map, stream.getSystemName(), this.zookeeper, this.kafkaBrokerList, stream.getBatchSize());
      }
    }
    // Input streams: set kafka systems
    for (SamzaSystemStream stream : pi.getInputStreams()) {
      boolean found = false;
      for (String name : nameList) {
        if (stream.getSystem().equals(name)) {
          found = true;
          break;
        }
      }
      if (!found) {
        nameList.add(stream.getSystem());
        setKafkaSystem(map, stream.getSystem(), this.zookeeper, this.kafkaBrokerList, 1);
      }
    }

    // Checkpointing
    setValue(map, "task.checkpoint.factory", "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
    setValue(map, "task.checkpoint.system", "kafka0");
    setValue(map, "task.commit.ms", "1000");
    setValue(map, "task.checkpoint.replication.factor", Integer.toString(this.replicationFactor));

    // Number of containers
    setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio);

    return map;
  }