in runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java [133:185]
private RuntimeMaster(final Scheduler scheduler,
final ContainerManager containerManager,
final ExecutorRegistry executorRegistry,
final MetricMessageHandler metricMessageHandler,
final MessageEnvironment masterMessageEnvironment,
final MetricManagerMaster metricManagerMaster,
final ClientRPC clientRPC,
final PlanStateManager planStateManager,
@Parameter(JobConf.JobId.class) final String jobId,
@Parameter(JobConf.DBEnabled.class) final Boolean dbEnabled,
@Parameter(JobConf.DBAddress.class) final String dbAddress,
@Parameter(JobConf.DBId.class) final String dbId,
@Parameter(JobConf.DBPasswd.class) final String dbPassword,
@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
// We would like to use a single thread for runtime master operations
// since the processing logic in master takes a very short amount of time
// compared to the job completion times of executed jobs
// and keeping it single threaded removes the complexity of multi-thread synchronization.
this.runtimeMasterThread =
Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster thread"));
// Check for speculative execution every second.
this.speculativeTaskCloningThread = Executors
.newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "SpeculativeTaskCloning thread"));
this.speculativeTaskCloningThread.scheduleAtFixedRate(
() -> this.runtimeMasterThread.submit(scheduler::onSpeculativeExecutionCheck),
SPECULATION_CHECKING_PERIOD_MS,
SPECULATION_CHECKING_PERIOD_MS,
TimeUnit.MILLISECONDS);
this.scheduler = scheduler;
this.containerManager = containerManager;
this.executorRegistry = executorRegistry;
this.metricMessageHandler = metricMessageHandler;
this.masterMessageEnvironment = masterMessageEnvironment;
this.masterMessageEnvironment
.setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver());
this.clientRPC = clientRPC;
this.metricManagerMaster = metricManagerMaster;
this.jobId = jobId;
this.dagDirectory = dagDirectory;
this.dbEnabled = dbEnabled;
this.dbAddress = dbAddress;
this.dbId = dbId;
this.dbPassword = dbPassword;
this.irVertices = new HashSet<>();
this.resourceRequestCount = new AtomicInteger(0);
this.objectMapper = new ObjectMapper();
this.metricServer = startRestMetricServer();
this.metricStore = MetricStore.getStore();
this.planStateManager = planStateManager;
this.metricCountDownLatch = new CountDownLatch(0);
}