void runLeaderProcess()

in solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java [106:353]


  void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart)
      throws KeeperException, InterruptedException {
    String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
    ActionThrottle lt;
    try (SolrCore core = cc.getCore(coreName)) {
      if (core == null) {
        // shutdown or removed
        return;
      }
      MDCLoggingContext.setCore(core);
      lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
    }

    try {
      lt.minimumWaitBetweenActions();
      lt.markAttemptingAction();

      int leaderVoteWait = cc.getZkController().getLeaderVoteWait();

      log.debug(
          "Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}",
          shardId,
          weAreReplacement,
          leaderVoteWait);
      if (zkController
              .getClusterState()
              .getCollection(collection)
              .getSlice(shardId)
              .getNumLeaderReplicas()
          > 1) {
        // Clear the leader in clusterstate. We only need to worry about this if there is actually
        // more than one replica.
        MapWriter m =
            ew ->
                ew.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower())
                    .put(ZkStateReader.SHARD_ID_PROP, shardId)
                    .put(ZkStateReader.COLLECTION_PROP, collection);
        if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
          distributedClusterStateUpdater.doSingleStateUpdate(
              DistributedClusterStateUpdater.MutatingCommand.SliceSetShardLeader,
              new ZkNodeProps(m),
              zkController.getSolrCloudManager(),
              zkStateReader);
        } else {
          zkController.getOverseer().getStateUpdateQueue().offer(m);
        }
      }

      if (!weAreReplacement) {
        waitForReplicasToComeUp(leaderVoteWait);
      }

      if (isClosed) {
        // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the
        // later, we cannot be sure we are still the leader, so we should bail out. The OnReconnect
        // handler will re-register the cores and handle a new leadership election.
        return;
      }

      Replica.Type replicaType;
      String coreNodeName;
      boolean setTermToMax = false;
      try (SolrCore core = cc.getCore(coreName)) {

        if (core == null) {
          return;
        }

        replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
        coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
        // should I be leader?
        ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
        if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
          if (!waitForEligibleBecomeLeaderAfterTimeout(
              zkShardTerms, coreNodeName, leaderVoteWait)) {
            rejoinLeaderElection(core);
            return;
          } else {
            // only log an error if this replica win the election
            setTermToMax = true;
          }
        }

        if (isClosed) {
          return;
        }

        log.info("I may be the new leader - try and sync");

        // we are going to attempt to be the leader
        // first cancel any current recovery
        core.getUpdateHandler().getSolrCoreState().cancelRecovery();

        if (weAreReplacement) {
          // wait a moment for any floating updates to finish
          try {
            Thread.sleep(2500);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
          }
        }

        PeerSync.PeerSyncResult result = null;
        boolean success = false;
        try {
          result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
          success = result.isSuccess();
        } catch (Exception e) {
          log.error("Exception while trying to sync", e);
          result = PeerSync.PeerSyncResult.failure();
        }

        UpdateLog ulog = core.getUpdateHandler().getUpdateLog();

        if (!success) {
          boolean hasRecentUpdates = false;
          if (ulog != null) {
            // TODO: we could optimize this if necessary
            try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
              hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
            }
          }

          if (!hasRecentUpdates) {
            // we failed sync, but we have no versions - we can't sync in that case
            // - we were active
            // before, so become leader anyway if no one else has any versions either
            if (result.getOtherHasVersions().orElse(false)) {
              log.info(
                  "We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
              success = false;
            } else {
              log.info(
                  "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
              success = true;
            }
          }
        }

        // solrcloud_debug
        if (log.isDebugEnabled()) {
          try {
            RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
            SolrIndexSearcher searcher = searchHolder.get();
            try {
              if (log.isDebugEnabled()) {
                log.debug(
                    "{} synched {}",
                    core.getCoreContainer().getZkController().getNodeName(),
                    searcher.count(new MatchAllDocsQuery()));
              }
            } finally {
              searchHolder.decref();
            }
          } catch (Exception e) {
            log.error("Error in solrcloud_debug block", e);
          }
        }
        if (!success) {
          rejoinLeaderElection(core);
          return;
        }
      }

      if (!isClosed) {
        try {
          if (replicaType.replicateFromLeader) {
            // stop replicate from old leader
            zkController.stopReplicationFromLeader(coreName);
          }
          if (replicaType == Replica.Type.TLOG) {
            if (weAreReplacement) {
              try (SolrCore core = cc.getCore(coreName)) {
                Future<UpdateLog.RecoveryInfo> future =
                    core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
                if (future != null) {
                  log.info("Replaying tlog before become new leader");
                  future.get();
                } else {
                  log.info("New leader does not have old tlog to replay");
                }
              }
            }
          }
          // in case of leaderVoteWait timeout, a replica with lower term can win the election
          if (setTermToMax) {
            log.error(
                "WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) {}",
                "without being up-to-date with the previous leader",
                coreNodeName);
            zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
          }
          super.runLeaderProcess(weAreReplacement, 0);
          try (SolrCore core = cc.getCore(coreName)) {
            if (core != null) {
              core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
              publishActiveIfRegisteredAndNotActive(core);
            } else {
              return;
            }
          }
          if (log.isInfoEnabled()) {
            log.info(
                "I am the new leader: {} {}", ZkCoreNodeProps.getCoreUrl(leaderProps), shardId);
          }

          // we made it as leader - send any recovery requests we need to
          syncStrategy.requestRecoveries();

        } catch (SessionExpiredException e) {
          throw new SolrException(
              ErrorCode.SERVER_ERROR,
              "ZK session expired - cancelling election for " + collection + " " + shardId);
        } catch (Exception e) {
          log.error("There was a problem trying to register as the leader", e);

          try (SolrCore core = cc.getCore(coreName)) {

            if (core == null) {
              if (log.isDebugEnabled()) {
                log.debug(
                    "SolrCore not found: {} in {}",
                    coreName,
                    CloudUtil.getLoadedCoreNamesAsString(cc));
              }
              return;
            }

            core.getCoreDescriptor().getCloudDescriptor().setLeader(false);

            // we could not publish ourselves as leader - try and rejoin election
            try {
              rejoinLeaderElection(core);
            } catch (SessionExpiredException exc) {
              throw new SolrException(
                  ErrorCode.SERVER_ERROR,
                  "ZK session expired - cancelling election for " + collection + " " + shardId);
            }
          }
        }
      } else {
        cancelElection();
      }
    } finally {
      MDCLoggingContext.clear();
    }
  }