in client/src/main/java/org/apache/nemo/client/JobLauncher.java [266:316]
public static void launchDAG(final IRDAG dag,
final Map<Serializable, Object> broadcastVariables,
final String jobId) {
// launch driver if it hasn't been already
if (!isSetUp) {
try {
setup(new String[]{"-job_id", jobId});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Wait until the driver is ready.
try {
LOG.info("Waiting for the driver to be ready");
driverReadyLatch.await();
} catch (final InterruptedException e) {
LOG.warn(INTERRUPTED, e);
// clean up state...
Thread.currentThread().interrupt();
}
LOG.info("Launching DAG...");
serializedDAG = Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag));
if (jobDoneLatch.getCount() == 0) { // when this is not the first execution.
jobDoneLatch = new CountDownLatch(1);
}
driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
.setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
.setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder()
.setDag(serializedDAG)
.setBroadcastVars(ByteString.copyFrom(SerializationUtils.serialize((Serializable) broadcastVariables)))
.build())
.build());
// Wait for the ExecutionDone message from the driver
try {
LOG.info("Waiting for the DAG to finish execution");
jobDoneLatch.await();
} catch (final InterruptedException e) {
LOG.warn(INTERRUPTED, e);
// clean up state...
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
LOG.info("DAG execution done");
// trigger shutdown.
shutdown();
}
}