public static void main()

in samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java [84:139]


  public static void main(String[] args) {
    // Read arguments
    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
    parseArguments(tmpArgs);

    args = tmpArgs.toArray(new String[0]);

    // Init Task
    StringBuilder cliString = new StringBuilder();
    for (int i = 0; i < args.length; i++) {
      cliString.append(" ").append(args[i]);
    }
    logger.debug("Command line string = {}", cliString.toString());
    System.out.println("Command line string = " + cliString.toString());

    Task task = null;
    try {
      task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null);
      logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName());
    } catch (Exception e) {
      logger.error("Fail to initialize the task", e);
      System.out.println("Fail to initialize the task" + e);
      return;
    }
    task.setFactory(new SamzaComponentFactory());
    task.init();

    // Upload JAR file to HDFS
    String hdfsPath = null;
    if (!isLocal) {
      Path path = FileSystems.getDefault().getPath(jarPackagePath);
      hdfsPath = uploadJarToHDFS(path.toFile());
      if (hdfsPath == null) {
        System.out.println("Fail uploading JAR file \"" + path.toAbsolutePath().toString() + "\" to HDFS.");
        return;
      }
    }

    // Set parameters
    SamzaEngine.getEngine()
        .setLocalMode(isLocal)
        .setZooKeeper(zookeeper)
        .setKafka(kafka)
        .setYarnPackage(hdfsPath)
        .setKafkaReplicationFactor(kafkaReplicationFactor)
        .setConfigHome(yarnConfHome)
        .setAMMemory(amMem)
        .setContainerMemory(containerMem)
        .setPiPerContainerRatio(piPerContainer)
        .setKryoRegisterFile(kryoRegisterFile)
        .setCheckpointFrequency(checkpointFrequency);

    // Submit topology
    SamzaEngine.submitTopology((SamzaTopology) task.getTopology());

  }