public static void setup()

in client/src/main/java/org/apache/nemo/client/JobLauncher.java [126:179]


  public static void setup(final String[] args)
    throws InjectionException, ClassNotFoundException, IOException, InvalidUserMainException {
    // Get Job and Driver Confs
    LOG.info("Project Root Path: {}", Util.fetchProjectRootPath());
    builtJobConf = getJobConf(args);
    validateJobConfig(builtJobConf);

    // Registers actions for launching the DAG.
    LOG.info("Launching RPC Server");
    driverRPCServer = new DriverRPCServer();
    driverRPCServer
      .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> {
      })
      .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown())
      .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown())
      .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll(
        SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
      .registerHandler(ControlMessage.DriverToClientMessageType.LaunchOptimization, message ->
        ClientUtils.handleOptimizationType(message, driverRPCServer))
      .run();

    final Configuration driverConf = getDriverConf(builtJobConf);
    final Configuration driverNcsConf = getDriverNcsConf();
    final Configuration driverMessageConfig = getDriverMessageConf();
    final String defaultExecutorResourceConfig = "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
      + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
    final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class,
      JobConf.ExecutorJSONContents.class, defaultExecutorResourceConfig);
    final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
      JobConf.BandwidthJSONContents.class, "");
    final Configuration clientConf = getClientConf();
    final Configuration schedulerConf = getSchedulerConf(builtJobConf);

    // Merge Job and Driver Confs
    jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfig,
      executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf);

    // Get DeployMode Conf
    deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);

    // Start Driver and launch user program.
    if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) {
      throw new RuntimeException("Configuration for launching driver is not ready");
    }

    // Launch driver
    LOG.info("Launching driver");
    driverReadyLatch = new CountDownLatch(1);
    jobDoneLatch = new CountDownLatch(1);
    isSetUp = true;
    driverLauncher = DriverLauncher.getLauncher(deployModeConf);
    driverLauncher.submit(jobAndDriverConf, 500);
    // When the driver is up and the resource is ready, the DriverReady message is delivered.
  }