in samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java [180:246]
private Map<String, String> getMapForPI(SamzaProcessingItem pi, String filename, String filesystem) throws Exception {
Map<String, String> map = getBasicSystemConfig();
// Set job name, task class, task inputs (from SamzaProcessingItem)
setJobName(map, pi.getName());
setTaskClass(map, SamzaProcessingItem.class.getName());
StringBuilder streamNames = new StringBuilder();
boolean first = true;
for (SamzaSystemStream stream : pi.getInputStreams()) {
if (!first)
streamNames.append(COMMA);
streamNames.append(stream.getSystem() + DOT + stream.getStream());
if (first)
first = false;
}
setTaskInputs(map, streamNames.toString());
// Processor file
setFileName(map, filename);
setFileSystem(map, filesystem);
List<String> nameList = new ArrayList<String>();
// Default kafka system: kafka0: sync producer
// This system is always required: it is used for checkpointing
nameList.add("kafka0");
setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1);
// Output streams: set kafka systems
for (SamzaStream stream : pi.getOutputStreams()) {
boolean found = false;
for (String name : nameList) {
if (stream.getSystemName().equals(name)) {
found = true;
break;
}
}
if (!found) {
nameList.add(stream.getSystemName());
setKafkaSystem(map, stream.getSystemName(), this.zookeeper, this.kafkaBrokerList, stream.getBatchSize());
}
}
// Input streams: set kafka systems
for (SamzaSystemStream stream : pi.getInputStreams()) {
boolean found = false;
for (String name : nameList) {
if (stream.getSystem().equals(name)) {
found = true;
break;
}
}
if (!found) {
nameList.add(stream.getSystem());
setKafkaSystem(map, stream.getSystem(), this.zookeeper, this.kafkaBrokerList, 1);
}
}
// Checkpointing
setValue(map, "task.checkpoint.factory", "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
setValue(map, "task.checkpoint.system", "kafka0");
setValue(map, "task.commit.ms", "1000");
setValue(map, "task.checkpoint.replication.factor", Integer.toString(this.replicationFactor));
// Number of containers
setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio);
return map;
}