in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java [107:234]
public MasterMain(
ConfigurationFactory configFactory,
MantisPropertiesLoader dynamicPropertiesLoader,
AuditEventSubscriber auditEventSubscriber) {
String test = "{\"jobId\":\"sine-function-1\",\"status\":{\"jobId\":\"sine-function-1\",\"stageNum\":1,\"workerIndex\":0,\"workerNumber\":2,\"type\":\"HEARTBEAT\",\"message\":\"heartbeat\",\"state\":\"Noop\",\"hostname\":null,\"timestamp\":1525813363585,\"reason\":\"Normal\",\"payloads\":[{\"type\":\"SubscriptionState\",\"data\":\"false\"},{\"type\":\"IncomingDataDrop\",\"data\":\"{\\\"onNextCount\\\":0,\\\"droppedCount\\\":0}\"}]}}";
Metrics metrics = new Metrics.Builder()
.id("MasterMain")
.addCounter("masterInitSuccess")
.addCounter("masterInitError")
.build();
Metrics m = MetricsRegistry.getInstance().registerAndGet(metrics);
try {
ConfigurationProvider.initialize(configFactory);
this.config = ConfigurationProvider.getConfig();
leadershipManager = new LeadershipManagerImpl(config, mantisServices);
Thread t = new Thread(this::shutdown);
t.setDaemon(true);
// shutdown hook
Runtime.getRuntime().addShutdownHook(t);
final ActorSystem system = ActorSystem.create("MantisMaster");
// log the configuration of the actor system
system.logConfiguration();
// log dead letter messages
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class), "MantisDeadLetter");
system.eventStream().subscribe(actor, DeadLetter.class);
ActorRef statusEventBrokerActor = system.actorOf(StatusEventBrokerActor.props(), "StatusEventBroker");
ActorRef auditEventBrokerActor = system.actorOf(AuditEventBrokerActor.props(auditEventSubscriber), "AuditEventBroker");
final StatusEventSubscriber statusEventSubscriber = new StatusEventSubscriberAkkaImpl(statusEventBrokerActor);
final AuditEventSubscriber auditEventSubscriberAkka = new AuditEventSubscriberAkkaImpl(auditEventBrokerActor);
final WorkerEventSubscriber workerEventSubscriber = WorkerRegistryV2.INSTANCE;
final WorkerMetricsCollector workerMetricsCollector = new WorkerMetricsCollector(
Duration.ofMinutes(5), // cleanup jobs after 5 minutes
Duration.ofMinutes(1), // check every 1 minute for jobs to be cleaned up
Clock.systemDefaultZone());
mantisServices.addService(BaseService.wrap(workerMetricsCollector));
// TODO who watches actors created at this level?
final LifecycleEventPublisher lifecycleEventPublisher =
new LifecycleEventPublisherImpl(auditEventSubscriberAkka, statusEventSubscriber,
workerEventSubscriber.and(workerMetricsCollector));
storageProvider = new KeyValueBasedPersistenceProvider(this.config.getStorageProvider(), lifecycleEventPublisher);
final MantisJobStore mantisJobStore = new MantisJobStore(storageProvider);
final ActorRef jobClusterManagerActor = system.actorOf(JobClustersManagerActor.props(mantisJobStore, lifecycleEventPublisher, config.getJobCostsCalculator(), config.getSlaMaxHeadroomForAccepted()), "JobClustersManager");
final JobMessageRouter jobMessageRouter = new JobMessageRouterImpl(jobClusterManagerActor);
// Beginning of new stuff
Configuration configuration = loadConfiguration();
final ActorRef resourceClustersHostActor = system.actorOf(
ResourceClustersHostManagerActor.props(
new ResourceClusterProviderAdapter(this.config.getResourceClusterProvider(), system),
storageProvider),
"ResourceClusterHostActor");
final RpcSystem rpcSystem =
MantisAkkaRpcSystemLoader.getInstance();
// the RPCService implementation will only be used for communicating with task executors but not for running a server itself.
// Thus, there's no need for any valid external and bind addresses.
final RpcService rpcService =
RpcUtils.createRemoteRpcService(rpcSystem, configuration, null, "6123", null, Optional.empty());
final ResourceClusters resourceClusters =
ResourceClustersAkkaImpl.load(
configFactory,
rpcService,
system,
mantisJobStore,
jobMessageRouter,
resourceClustersHostActor,
storageProvider,
dynamicPropertiesLoader);
// end of new stuff
final MantisSchedulerFactory mantisSchedulerFactory =
new MantisSchedulerFactoryImpl(system, resourceClusters, new ExecuteStageRequestFactory(getConfig()), jobMessageRouter, getConfig(), MetricsRegistry.getInstance());
final boolean loadJobsFromStoreOnInit = true;
final JobClustersManagerService jobClustersManagerService = new JobClustersManagerService(jobClusterManagerActor, mantisSchedulerFactory, loadJobsFromStoreOnInit);
// start serving metrics
if (config.getMasterMetricsPort() > 0) {
new MetricsServerService(config.getMasterMetricsPort(), 1, Collections.emptyMap()).start();
}
new MetricsPublisherService(config.getMetricsPublisher(), config.getMetricsPublisherFrequencyInSeconds(),
new HashMap<>()).start();
// services
mantisServices.addService(jobClustersManagerService);
// set up leader election
final ILeaderElectorFactory leaderFactory;
final String fqcnLeaderFactory = config.getLeaderElectorFactory();
if(!config.isLocalMode() && ConfigUtils.createInstance(fqcnLeaderFactory, ILeaderElectorFactory.class) instanceof LocalLeaderFactory) {
logger.warn("local mode is {} and leader factory is {} this configuration is unsafe", config.isLocalMode(), config.getLeaderElectorFactory().getClass().getSimpleName());
final ZookeeperLeadershipFactory zkLeadership = new ZookeeperLeadershipFactory();
leaderFactory = zkLeadership;
monitor = zkLeadership.createLeaderMonitor(config);
logger.warn("using default non-local Zookeeper leader services you should set: "+
"mantis.leader.elector.factory=io.mantisrx.master.zk.ZookeeperLeadershipFactory");
} else {
leaderFactory = ConfigUtils.createInstance(fqcnLeaderFactory, ILeaderElectorFactory.class);
monitor = ConfigUtils.createInstance(config.getLeaderMonitorFactoryName(), ILeaderMonitorFactory.class).createLeaderMonitor(config);
logger.warn("using leader factory {}", config.isLocalMode());
}
monitor.start();
mantisServices.addService(leaderFactory.createLeaderElector(config, leadershipManager));
mantisServices.addService(new MasterApiAkkaService(monitor, leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor,
resourceClusters, resourceClustersHostActor, config.getApiPort(), storageProvider, lifecycleEventPublisher, leadershipManager));
if (leaderFactory instanceof LocalLeaderFactory && !config.isLocalMode()) {
logger.error("local mode is [ {} ] and leader factory is {} this configuration is unsafe", config.isLocalMode(), leaderFactory.getClass().getSimpleName());
throw new RuntimeException("leader election is local but local mode is not enabled");
}
m.getCounter("masterInitSuccess").increment();
} catch (Exception e) {
logger.error("caught exception on Mantis Master initialization", e);
m.getCounter("masterInitError").increment();
shutdown();
System.exit(1);
}
}