public MasterMain()

in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java [107:234]


    public MasterMain(
        ConfigurationFactory configFactory,
        MantisPropertiesLoader dynamicPropertiesLoader,
        AuditEventSubscriber auditEventSubscriber) {
        String test = "{\"jobId\":\"sine-function-1\",\"status\":{\"jobId\":\"sine-function-1\",\"stageNum\":1,\"workerIndex\":0,\"workerNumber\":2,\"type\":\"HEARTBEAT\",\"message\":\"heartbeat\",\"state\":\"Noop\",\"hostname\":null,\"timestamp\":1525813363585,\"reason\":\"Normal\",\"payloads\":[{\"type\":\"SubscriptionState\",\"data\":\"false\"},{\"type\":\"IncomingDataDrop\",\"data\":\"{\\\"onNextCount\\\":0,\\\"droppedCount\\\":0}\"}]}}";

        Metrics metrics = new Metrics.Builder()
                .id("MasterMain")
                .addCounter("masterInitSuccess")
                .addCounter("masterInitError")
                .build();
        Metrics m = MetricsRegistry.getInstance().registerAndGet(metrics);
        try {
            ConfigurationProvider.initialize(configFactory);
            this.config = ConfigurationProvider.getConfig();
            leadershipManager = new LeadershipManagerImpl(config, mantisServices);

            Thread t = new Thread(this::shutdown);
            t.setDaemon(true);
            // shutdown hook
            Runtime.getRuntime().addShutdownHook(t);

            final ActorSystem system = ActorSystem.create("MantisMaster");
            // log the configuration of the actor system
            system.logConfiguration();

            // log dead letter messages
            final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class), "MantisDeadLetter");
            system.eventStream().subscribe(actor, DeadLetter.class);

            ActorRef statusEventBrokerActor = system.actorOf(StatusEventBrokerActor.props(), "StatusEventBroker");
            ActorRef auditEventBrokerActor = system.actorOf(AuditEventBrokerActor.props(auditEventSubscriber), "AuditEventBroker");
            final StatusEventSubscriber statusEventSubscriber = new StatusEventSubscriberAkkaImpl(statusEventBrokerActor);
            final AuditEventSubscriber auditEventSubscriberAkka = new AuditEventSubscriberAkkaImpl(auditEventBrokerActor);
            final WorkerEventSubscriber workerEventSubscriber = WorkerRegistryV2.INSTANCE;
            final WorkerMetricsCollector workerMetricsCollector = new WorkerMetricsCollector(
                Duration.ofMinutes(5), // cleanup jobs after 5 minutes
                Duration.ofMinutes(1), // check every 1 minute for jobs to be cleaned up
                Clock.systemDefaultZone());
            mantisServices.addService(BaseService.wrap(workerMetricsCollector));

            // TODO who watches actors created at this level?
            final LifecycleEventPublisher lifecycleEventPublisher =
                new LifecycleEventPublisherImpl(auditEventSubscriberAkka, statusEventSubscriber,
                    workerEventSubscriber.and(workerMetricsCollector));

            storageProvider = new KeyValueBasedPersistenceProvider(this.config.getStorageProvider(), lifecycleEventPublisher);
            final MantisJobStore mantisJobStore = new MantisJobStore(storageProvider);

            final ActorRef jobClusterManagerActor = system.actorOf(JobClustersManagerActor.props(mantisJobStore, lifecycleEventPublisher, config.getJobCostsCalculator(), config.getSlaMaxHeadroomForAccepted()), "JobClustersManager");
            final JobMessageRouter jobMessageRouter = new JobMessageRouterImpl(jobClusterManagerActor);

            // Beginning of new stuff
            Configuration configuration = loadConfiguration();

            final ActorRef resourceClustersHostActor = system.actorOf(
                ResourceClustersHostManagerActor.props(
                    new ResourceClusterProviderAdapter(this.config.getResourceClusterProvider(), system),
                    storageProvider),
                "ResourceClusterHostActor");

            final RpcSystem rpcSystem =
                MantisAkkaRpcSystemLoader.getInstance();
            // the RPCService implementation will only be used for communicating with task executors but not for running a server itself.
            // Thus, there's no need for any valid external and bind addresses.
            final RpcService rpcService =
                RpcUtils.createRemoteRpcService(rpcSystem, configuration, null, "6123", null, Optional.empty());
            final ResourceClusters resourceClusters =
                ResourceClustersAkkaImpl.load(
                    configFactory,
                    rpcService,
                    system,
                    mantisJobStore,
                    jobMessageRouter,
                    resourceClustersHostActor,
                    storageProvider,
                    dynamicPropertiesLoader);

            // end of new stuff
            final MantisSchedulerFactory mantisSchedulerFactory =
                new MantisSchedulerFactoryImpl(system, resourceClusters, new ExecuteStageRequestFactory(getConfig()), jobMessageRouter, getConfig(), MetricsRegistry.getInstance());

            final boolean loadJobsFromStoreOnInit = true;
            final JobClustersManagerService jobClustersManagerService = new JobClustersManagerService(jobClusterManagerActor, mantisSchedulerFactory, loadJobsFromStoreOnInit);

            // start serving metrics
            if (config.getMasterMetricsPort() > 0) {
                new MetricsServerService(config.getMasterMetricsPort(), 1, Collections.emptyMap()).start();
            }
            new MetricsPublisherService(config.getMetricsPublisher(), config.getMetricsPublisherFrequencyInSeconds(),
                    new HashMap<>()).start();

            // services
            mantisServices.addService(jobClustersManagerService);

            // set up leader election
            final ILeaderElectorFactory leaderFactory;
            final String fqcnLeaderFactory = config.getLeaderElectorFactory();
            if(!config.isLocalMode() && ConfigUtils.createInstance(fqcnLeaderFactory, ILeaderElectorFactory.class) instanceof LocalLeaderFactory) {
                logger.warn("local mode is {} and leader factory is {} this configuration is unsafe", config.isLocalMode(), config.getLeaderElectorFactory().getClass().getSimpleName());
                final ZookeeperLeadershipFactory zkLeadership = new ZookeeperLeadershipFactory();
                leaderFactory = zkLeadership;
                monitor = zkLeadership.createLeaderMonitor(config);
                logger.warn("using default non-local Zookeeper leader services you should set: "+
                    "mantis.leader.elector.factory=io.mantisrx.master.zk.ZookeeperLeadershipFactory");
            } else {
                leaderFactory = ConfigUtils.createInstance(fqcnLeaderFactory, ILeaderElectorFactory.class);
                monitor = ConfigUtils.createInstance(config.getLeaderMonitorFactoryName(), ILeaderMonitorFactory.class).createLeaderMonitor(config);
                logger.warn("using leader factory {}", config.isLocalMode());
            }
            monitor.start();
            mantisServices.addService(leaderFactory.createLeaderElector(config, leadershipManager));
            mantisServices.addService(new MasterApiAkkaService(monitor, leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor,
                resourceClusters, resourceClustersHostActor, config.getApiPort(), storageProvider, lifecycleEventPublisher, leadershipManager));

            if (leaderFactory instanceof LocalLeaderFactory && !config.isLocalMode()) {
                logger.error("local mode is [ {} ] and leader factory is {} this configuration is unsafe", config.isLocalMode(), leaderFactory.getClass().getSimpleName());
                throw new RuntimeException("leader election is local but local mode is not enabled");
            }

            m.getCounter("masterInitSuccess").increment();
        } catch (Exception e) {
            logger.error("caught exception on Mantis Master initialization", e);
            m.getCounter("masterInitError").increment();
            shutdown();
            System.exit(1);
        }
    }