in samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java [84:139]
public static void main(String[] args) {
// Read arguments
List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
parseArguments(tmpArgs);
args = tmpArgs.toArray(new String[0]);
// Init Task
StringBuilder cliString = new StringBuilder();
for (int i = 0; i < args.length; i++) {
cliString.append(" ").append(args[i]);
}
logger.debug("Command line string = {}", cliString.toString());
System.out.println("Command line string = " + cliString.toString());
Task task = null;
try {
task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null);
logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName());
} catch (Exception e) {
logger.error("Fail to initialize the task", e);
System.out.println("Fail to initialize the task" + e);
return;
}
task.setFactory(new SamzaComponentFactory());
task.init();
// Upload JAR file to HDFS
String hdfsPath = null;
if (!isLocal) {
Path path = FileSystems.getDefault().getPath(jarPackagePath);
hdfsPath = uploadJarToHDFS(path.toFile());
if (hdfsPath == null) {
System.out.println("Fail uploading JAR file \"" + path.toAbsolutePath().toString() + "\" to HDFS.");
return;
}
}
// Set parameters
SamzaEngine.getEngine()
.setLocalMode(isLocal)
.setZooKeeper(zookeeper)
.setKafka(kafka)
.setYarnPackage(hdfsPath)
.setKafkaReplicationFactor(kafkaReplicationFactor)
.setConfigHome(yarnConfHome)
.setAMMemory(amMem)
.setContainerMemory(containerMem)
.setPiPerContainerRatio(piPerContainer)
.setKryoRegisterFile(kryoRegisterFile)
.setCheckpointFrequency(checkpointFrequency);
// Submit topology
SamzaEngine.submitTopology((SamzaTopology) task.getTopology());
}