public boolean init()

in modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java [983:1165]


    public boolean init(final NodeOptions opts) {
        Requires.requireNonNull(opts, "Null node options");
        Requires.requireNonNull(opts.getRaftOptions(), "Null raft options");
        Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
        this.serviceFactory = opts.getServiceFactory();
        this.clock = opts.getClock();
        this.options = opts;
        this.raftOptions = opts.getRaftOptions();
        this.metrics = new NodeMetrics(opts.isEnableMetrics());
        this.serverId.setPriority(opts.getElectionPriority());
        this.electionTimeoutCounter = 0;
        if (opts.getReplicationStateListeners() != null)
            this.replicatorStateListeners.addAll(opts.getReplicationStateListeners());

        if (this.serverId.isEmpty()) {
            LOG.error("Server ID is empty.");
            return false;
        }

        // Init timers.
        initTimers(opts);

        // Init pools.
        initPools(opts);

        this.configManager = new ConfigurationManager();

        applyDisruptor = opts.getNodeApplyDisruptor();

        applyQueue = applyDisruptor.subscribe(getNodeId(), new LogEntryAndClosureHandler());

        if (this.metrics.getMetricRegistry() != null) {
            this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor",
                new DisruptorMetricSet(this.applyQueue));
        }

        this.fsmCaller = new FSMCallerImpl();
        if (!initLogStorage()) {
            LOG.error("Node {} initLogStorage failed.", getNodeId());
            return false;
        }
        if (!initMetaStorage()) {
            LOG.error("Node {} initMetaStorage failed.", getNodeId());
            return false;
        }
        if (!initFSMCaller(new LogId(0, 0))) {
            LOG.error("Node {} initFSMCaller failed.", getNodeId());
            return false;
        }
        this.ballotBox = new BallotBox();
        final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
        ballotBoxOpts.setWaiter(this.fsmCaller);
        ballotBoxOpts.setClosureQueue(this.closureQueue);
        if (!this.ballotBox.init(ballotBoxOpts)) {
            LOG.error("Node {} init ballotBox failed.", getNodeId());
            return false;
        }

        if (!initSnapshotStorage()) {
            LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
            return false;
        }

        final Status st = this.logManager.checkConsistency();
        if (!st.isOk()) {
            LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st);
            return false;
        }
        this.conf = new ConfigurationEntry();
        this.conf.setId(new LogId());
        // if have log using conf in log, else using conf in options
        if (this.logManager.getLastLogIndex() > 0) {
            checkAndSetConfiguration(false);
        }
        else {
            this.conf.setConf(this.options.getInitialConf());
            // initially set to max(priority of all nodes)
            this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers());
        }

        if (!this.conf.isEmpty()) {
            Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf);
        }
        else {
            LOG.info("Init node {} with empty conf.", this.serverId);
        }

        this.replicatorGroup = new ReplicatorGroupImpl();
        this.rpcClientService = new DefaultRaftClientService();
        final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
        rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
        rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
        rgOpts.setLogManager(this.logManager);
        rgOpts.setBallotBox(this.ballotBox);
        rgOpts.setNode(this);
        rgOpts.setRaftRpcClientService(this.rpcClientService);
        rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
        rgOpts.setRaftOptions(this.raftOptions);
        rgOpts.setTimerManager(this.options.getScheduler());

        // Adds metric registry to RPC service.
        this.options.setMetricRegistry(this.metrics.getMetricRegistry());

        // Wait committed.
        long commitIdx = logManager.getLastLogIndex();

        CompletableFuture<Long> logApplyComplition = new CompletableFuture<>();

        if (commitIdx > fsmCaller.getLastAppliedIndex()) {
            LastAppliedLogIndexListener lnsr = new LastAppliedLogIndexListener() {
                @Override
                public void onApplied( long lastAppliedLogIndex) {
                    if (lastAppliedLogIndex >= commitIdx) {
                        logApplyComplition.complete(lastAppliedLogIndex);
                        fsmCaller.removeLastAppliedLogIndexListener(this);
                    }
                }
            };

            fsmCaller.addLastAppliedLogIndexListener(lnsr);

            fsmCaller.onCommitted(commitIdx);
        } else {
            logApplyComplition.complete(fsmCaller.getLastAppliedIndex());
        }

        if (!this.rpcClientService.init(this.options)) {
            LOG.error("Fail to init rpc service.");
            return false;
        }
        this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts);

        this.readOnlyService = new ReadOnlyServiceImpl();
        final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
        rosOpts.setFsmCaller(this.fsmCaller);
        rosOpts.setNode(this);
        rosOpts.setRaftOptions(this.raftOptions);
        rosOpts.setReadOnlyServiceDisruptor(opts.getReadOnlyServiceDisruptor());

        if (!this.readOnlyService.init(rosOpts)) {
            LOG.error("Fail to init readOnlyService.");
            return false;
        }

        logApplyComplition.whenComplete((committedIdx, err) -> {
            if (err != null) {
                LOG.error("Fail to apply committed updates.", err);
            }

            // set state to follower
            this.state = State.STATE_FOLLOWER;

            if (LOG.isInfoEnabled()) {
                LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
                    this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf());
            }

            if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {
                LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);
                this.snapshotTimer.start();
            }

            if (!this.conf.isEmpty()) {
                stepDown(this.currTerm, false, new Status());
            }

            // Now the raft node is started , have to acquire the writeLock to avoid race
            // conditions
            this.writeLock.lock();
            if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) {
                // The group contains only this server which must be the LEADER, trigger
                // the timer immediately.
                electSelf();
            }
            else {
                this.writeLock.unlock();
            }

            applyCommittedFuture.complete(commitIdx);
        });

        return true;
    }