private static void parseArguments()

in samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java [145:218]


  private static void parseArguments(List<String> args) {
    for (int i = args.size() - 1; i >= 0; i--) {
      String arg = args.get(i).trim();
      String[] splitted = arg.split("=", 2);

      if (splitted.length >= 2) {
        // YARN config folder which contains conf/core-site.xml,
        // conf/hdfs-site.xml, conf/yarn-site.xml
        if (splitted[0].equals(YARN_CONF_FLAG)) {
          yarnConfHome = splitted[1];
          args.remove(i);
        }
        // host:port for zookeeper cluster
        else if (splitted[0].equals(ZK_FLAG)) {
          zookeeper = splitted[1];
          args.remove(i);
        }
        // host:port,... for kafka broker(s)
        else if (splitted[0].equals(KAFKA_FLAG)) {
          kafka = splitted[1];
          args.remove(i);
        }
        // whether we are running Samza in Local mode or YARN mode
        else if (splitted[0].equals(MODE_FLAG)) {
          isLocal = isLocalMode(splitted[1]);
          args.remove(i);
        }
        // memory requirement for YARN application master
        else if (splitted[0].equals(AM_MEMORY_FLAG)) {
          amMem = Integer.parseInt(splitted[1]);
          args.remove(i);
        }
        // memory requirement for YARN worker container
        else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) {
          containerMem = Integer.parseInt(splitted[1]);
          args.remove(i);
        }
        // the path to JAR archive that we need to upload to HDFS
        else if (splitted[0].equals(JAR_PACKAGE_FLAG)) {
          jarPackagePath = splitted[1];
          args.remove(i);
        }
        // the HDFS dir for SAMOA files
        else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) {
          samoaHDFSDir = splitted[1];
          if (samoaHDFSDir.length() < 1)
            samoaHDFSDir = null;
          args.remove(i);
        }
        // number of max PI instances per container
        // this will be used to compute the number of containers
        // AM will request for the job
        else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) {
          piPerContainer = Integer.parseInt(splitted[1]);
          args.remove(i);
        }
        // kafka streams replication factor
        else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) {
          kafkaReplicationFactor = Integer.parseInt(splitted[1]);
          args.remove(i);
        }
        // checkpoint frequency in ms
        else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) {
          checkpointFrequency = Integer.parseInt(splitted[1]);
          args.remove(i);
        }
        // the file contains registration information for Kryo serializer
        else if (splitted[0].equals(KRYO_REGISTER_FLAG)) {
          kryoRegisterFile = splitted[1];
          args.remove(i);
        }
      }
    }
  }