public static void launchDAG()

in client/src/main/java/org/apache/nemo/client/JobLauncher.java [266:316]


  public static void launchDAG(final IRDAG dag,
                               final Map<Serializable, Object> broadcastVariables,
                               final String jobId) {
    // launch driver if it hasn't been already
    if (!isSetUp) {
      try {
        setup(new String[]{"-job_id", jobId});
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }

    // Wait until the driver is ready.
    try {
      LOG.info("Waiting for the driver to be ready");
      driverReadyLatch.await();
    } catch (final InterruptedException e) {
      LOG.warn(INTERRUPTED, e);
      // clean up state...
      Thread.currentThread().interrupt();
    }

    LOG.info("Launching DAG...");
    serializedDAG = Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag));
    if (jobDoneLatch.getCount() == 0) {  // when this is not the first execution.
      jobDoneLatch = new CountDownLatch(1);
    }

    driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
      .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
      .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder()
        .setDag(serializedDAG)
        .setBroadcastVars(ByteString.copyFrom(SerializationUtils.serialize((Serializable) broadcastVariables)))
        .build())
      .build());

    // Wait for the ExecutionDone message from the driver
    try {
      LOG.info("Waiting for the DAG to finish execution");
      jobDoneLatch.await();
    } catch (final InterruptedException e) {
      LOG.warn(INTERRUPTED, e);
      // clean up state...
      Thread.currentThread().interrupt();
      throw new RuntimeException(e);
    } finally {
      LOG.info("DAG execution done");
      // trigger shutdown.
      shutdown();
    }
  }