in runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java [95:136]
private NemoDriver(final UserApplicationRunner userApplicationRunner,
final RuntimeMaster runtimeMaster,
final NameServer nameServer,
final LocalAddressProvider localAddressProvider,
final JobMessageObserver client,
final ClientRPC clientRPC,
final DataPlaneConf dataPlaneConf,
@Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString,
@Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString,
@Parameter(JobConf.JobId.class) final String jobId,
@Parameter(JobConf.StreamMetricPeriod.class) final int streamMetricPeriod,
@Parameter(JobConf.LatencyMarkPeriod.class) final int latencyMarkPeriod,
@Parameter(JobConf.FileDirectory.class) final String localDirectory,
@Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory) {
IdManager.setInDriver();
this.userApplicationRunner = userApplicationRunner;
this.runtimeMaster = runtimeMaster;
this.nameServer = nameServer;
this.localAddressProvider = localAddressProvider;
this.resourceSpecificationString = resourceSpecificationString;
this.jobId = jobId;
this.streamMetricPeriod = streamMetricPeriod;
this.latencyMarkPeriod = latencyMarkPeriod;
this.localDirectory = localDirectory;
this.glusterDirectory = glusterDirectory;
this.handler = new RemoteClientMessageLoggingHandler(client);
this.clientRPC = clientRPC;
this.dataPlaneConf = dataPlaneConf;
// TODO #69: Support job-wide execution property
ResourceSitePass.setBandwidthSpecificationString(bandwidthString);
clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.Notification, this::handleNotification);
clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG, message -> {
startSchedulingUserDAG(message.getLaunchDAG().getDag());
final Map<Serializable, Object> broadcastVars =
SerializationUtils.deserialize(message.getLaunchDAG().getBroadcastVars().toByteArray());
BroadcastManagerMaster.registerBroadcastVariablesFromClient(broadcastVars);
});
clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.DriverShutdown, message -> shutdown());
// Send DriverStarted message to the client
clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
.setType(ControlMessage.DriverToClientMessageType.DriverStarted).build());
}