in samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java [145:218]
private static void parseArguments(List<String> args) {
for (int i = args.size() - 1; i >= 0; i--) {
String arg = args.get(i).trim();
String[] splitted = arg.split("=", 2);
if (splitted.length >= 2) {
// YARN config folder which contains conf/core-site.xml,
// conf/hdfs-site.xml, conf/yarn-site.xml
if (splitted[0].equals(YARN_CONF_FLAG)) {
yarnConfHome = splitted[1];
args.remove(i);
}
// host:port for zookeeper cluster
else if (splitted[0].equals(ZK_FLAG)) {
zookeeper = splitted[1];
args.remove(i);
}
// host:port,... for kafka broker(s)
else if (splitted[0].equals(KAFKA_FLAG)) {
kafka = splitted[1];
args.remove(i);
}
// whether we are running Samza in Local mode or YARN mode
else if (splitted[0].equals(MODE_FLAG)) {
isLocal = isLocalMode(splitted[1]);
args.remove(i);
}
// memory requirement for YARN application master
else if (splitted[0].equals(AM_MEMORY_FLAG)) {
amMem = Integer.parseInt(splitted[1]);
args.remove(i);
}
// memory requirement for YARN worker container
else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) {
containerMem = Integer.parseInt(splitted[1]);
args.remove(i);
}
// the path to JAR archive that we need to upload to HDFS
else if (splitted[0].equals(JAR_PACKAGE_FLAG)) {
jarPackagePath = splitted[1];
args.remove(i);
}
// the HDFS dir for SAMOA files
else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) {
samoaHDFSDir = splitted[1];
if (samoaHDFSDir.length() < 1)
samoaHDFSDir = null;
args.remove(i);
}
// number of max PI instances per container
// this will be used to compute the number of containers
// AM will request for the job
else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) {
piPerContainer = Integer.parseInt(splitted[1]);
args.remove(i);
}
// kafka streams replication factor
else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) {
kafkaReplicationFactor = Integer.parseInt(splitted[1]);
args.remove(i);
}
// checkpoint frequency in ms
else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) {
checkpointFrequency = Integer.parseInt(splitted[1]);
args.remove(i);
}
// the file contains registration information for Kryo serializer
else if (splitted[0].equals(KRYO_REGISTER_FLAG)) {
kryoRegisterFile = splitted[1];
args.remove(i);
}
}
}
}