in src/main/java/org/apache/fluo/yarn/FluoYarnLauncher.java [95:164]
public static void main(String[] args) throws Exception {
if (args.length != 5) {
System.err.println("Invalid arguments");
System.exit(-1);
}
String connProps = args[0];
String yarnProps = args[1];
String logProps = args[2];
String appName = args[3];
String jarPath = args[4];
FluoYarnEnv env = new FluoYarnEnv(yarnProps, connProps, logProps, appName, jarPath);
BundledJarRunner.Arguments oracleArgs =
new BundledJarRunner.Arguments.Builder().setJarFileName(env.getBundledJarName())
.setLibFolder("lib").setMainClassName("org.apache.fluo.command.FluoOracle")
.setMainArgs(new String[] {"-a", appName}).createArguments();
BundledJarRunner.Arguments workerArgs =
new BundledJarRunner.Arguments.Builder().setJarFileName(env.getBundledJarName())
.setLibFolder("lib").setMainClassName("org.apache.fluo.command.FluoWorker")
.setMainArgs(new String[] {"-a", appName}).createArguments();
TwillRunnerService twillRunner =
new YarnTwillRunnerService(env.getYarnConfiguration(), env.getZookeepers());
twillRunner.start();
TwillPreparer preparer = twillRunner.prepare(new FluoYarnApp(env))
.addJVMOptions("-Dlog4j.configuration=file:$PWD/" + LOG4J_PROPS)
.addJVMOptions(("-Dfluo.conn.props=" + CONN_PROPS))
.withArguments(ORACLE_ID, oracleArgs.toArray())
.withArguments(WORKER_ID, workerArgs.toArray());
if (env.getYarnQueue() != null) {
preparer.setSchedulerQueue(env.getYarnQueue());
}
if (env.getWorkerReservedMemory() != null) {
preparer.withConfiguration(WORKER_ID, Collections
.singletonMap(Configs.Keys.JAVA_RESERVED_MEMORY_MB, env.getWorkerReservedMemory()));
}
if (env.getOracleReservedMemory() != null) {
preparer.withConfiguration(ORACLE_ID, Collections
.singletonMap(Configs.Keys.JAVA_RESERVED_MEMORY_MB, env.getOracleReservedMemory()));
}
TwillController controller = preparer.start();
ResourceReport report = controller.getResourceReport();
log.info("Waiting for Fluo application '{}' to start in YARN...", appName);
while (report == null) {
Thread.sleep(500);
report = controller.getResourceReport();
}
String appID = report.getApplicationId();
log.info("Fluo application '{}' has started in YARN with ID '{}'", appName, appID);
log.info("Waiting for all containers of Fluo application '{}' to start in YARN...", appName);
int numRunning = getNumRunning(controller);
while (numRunning != env.getTotalInstances()) {
log.info("{} of {} containers have started in YARN", numRunning, env.getTotalInstances());
Thread.sleep(2000);
numRunning = getNumRunning(controller);
}
log.info("{} of {} containers have started in YARN", numRunning, env.getTotalInstances());
log.info("Fluo application '{}' has successfully started in YARN with ID '{}'", appName, appID);
}