in client/src/main/java/org/apache/nemo/client/JobLauncher.java [126:179]
public static void setup(final String[] args)
throws InjectionException, ClassNotFoundException, IOException, InvalidUserMainException {
// Get Job and Driver Confs
LOG.info("Project Root Path: {}", Util.fetchProjectRootPath());
builtJobConf = getJobConf(args);
validateJobConfig(builtJobConf);
// Registers actions for launching the DAG.
LOG.info("Launching RPC Server");
driverRPCServer = new DriverRPCServer();
driverRPCServer
.registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> {
})
.registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown())
.registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown())
.registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll(
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
.registerHandler(ControlMessage.DriverToClientMessageType.LaunchOptimization, message ->
ClientUtils.handleOptimizationType(message, driverRPCServer))
.run();
final Configuration driverConf = getDriverConf(builtJobConf);
final Configuration driverNcsConf = getDriverNcsConf();
final Configuration driverMessageConfig = getDriverMessageConf();
final String defaultExecutorResourceConfig = "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
+ "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class,
JobConf.ExecutorJSONContents.class, defaultExecutorResourceConfig);
final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
JobConf.BandwidthJSONContents.class, "");
final Configuration clientConf = getClientConf();
final Configuration schedulerConf = getSchedulerConf(builtJobConf);
// Merge Job and Driver Confs
jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfig,
executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf);
// Get DeployMode Conf
deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
// Start Driver and launch user program.
if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) {
throw new RuntimeException("Configuration for launching driver is not ready");
}
// Launch driver
LOG.info("Launching driver");
driverReadyLatch = new CountDownLatch(1);
jobDoneLatch = new CountDownLatch(1);
isSetUp = true;
driverLauncher = DriverLauncher.getLauncher(deployModeConf);
driverLauncher.submit(jobAndDriverConf, 500);
// When the driver is up and the resource is ready, the DriverReady message is delivered.
}