public final void doSyncOrReplicateRecovery()

in solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java [517:772]


  public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
    final RTimer timer = new RTimer();
    boolean successfulRecovery = false;

    UpdateLog ulog;
    ulog = core.getUpdateHandler().getUpdateLog();
    if (ulog == null) {
      log.error("No UpdateLog found - cannot recover.");
      recoveryFailed(zkController, this.coreDescriptor);
      return;
    }

    // we temporary ignore peersync for tlog replicas
    boolean firstTime = replicaType != Replica.Type.TLOG;

    List<Long> recentVersions;
    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
      recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
    } catch (Exception e) {
      log.error("Corrupt tlog - ignoring.", e);
      recentVersions = new ArrayList<>(0);
    }

    List<Long> startingVersions = ulog.getStartingVersions();

    if (startingVersions != null && recoveringAfterStartup) {
      try {
        int oldIdx = 0; // index of the start of the old list in the current list
        long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;

        for (; oldIdx < recentVersions.size(); oldIdx++) {
          if (recentVersions.get(oldIdx) == firstStartingVersion) break;
        }

        if (oldIdx > 0) {
          log.info("Found new versions added after startup: num=[{}]", oldIdx);
          if (log.isInfoEnabled()) {
            log.info(
                "currentVersions size={} range=[{} to {}]",
                recentVersions.size(),
                recentVersions.get(0),
                recentVersions.get(recentVersions.size() - 1));
          }
        }

        if (startingVersions.isEmpty()) {
          log.info("startupVersions is empty");
        } else {
          if (log.isInfoEnabled()) {
            log.info(
                "startupVersions size={} range=[{} to {}]",
                startingVersions.size(),
                startingVersions.get(0),
                startingVersions.get(startingVersions.size() - 1));
          }
        }
      } catch (Exception e) {
        log.error("Error getting recent versions.", e);
        recentVersions = new ArrayList<>(0);
      }
    }

    if (recoveringAfterStartup) {
      // if we're recovering after startup (i.e. we have been down), then we need to know what the
      // last versions were when we went down. We may have received updates since then.
      recentVersions = startingVersions;
      try {
        if (ulog.existOldBufferLog()) {
          // this means we were previously doing a full index replication that probably didn't
          // complete and buffering updates in the meantime.
          log.info(
              "Looks like a previous replication recovery did not complete - skipping peer sync.");
          firstTime = false; // skip peersync
        }
      } catch (Exception e) {
        log.error("Error trying to get ulog starting operation.", e);
        firstTime = false; // skip peersync
      }
    }

    if (replicaType.replicateFromLeader) {
      zkController.stopReplicationFromLeader(coreName);
    }

    final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
    Future<RecoveryInfo> replayFuture = null;
    // don't use interruption or it will close channels though
    while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) {
      try {
        CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
        final Replica leader = pingLeader(ourUrl, this.coreDescriptor, true);
        if (isClosed()) {
          log.info("RecoveryStrategy has been closed");
          break;
        }

        boolean isLeader = leader.getCoreUrl().equals(ourUrl);
        if (isLeader && !cloudDesc.isLeader()) {
          throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
        }
        if (cloudDesc.isLeader()) {
          // we are now the leader - no one else must have been suitable
          log.warn("We have not yet recovered - but we are now the leader!");
          log.info("Finished recovery process.");
          zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
          return;
        }

        log.info("Begin buffering updates. core=[{}]", coreName);
        // recalling buffer updates will drop the old buffer tlog
        ulog.bufferUpdates();

        if (log.isInfoEnabled()) {
          log.info(
              "Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]",
              core.getName(),
              leader.getCoreUrl(),
              ourUrl);
        }
        zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);

        final Slice slice =
            zkStateReader
                .getClusterState()
                .getCollection(cloudDesc.getCollectionName())
                .getSlice(cloudDesc.getShardId());

        cancelPrepRecoveryCmd();

        if (isClosed()) {
          log.info("RecoveryStrategy has been closed");
          break;
        }

        sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);

        if (isClosed()) {
          log.info("RecoveryStrategy has been closed");
          break;
        }

        // we wait a bit so that any updates on the leader
        // that started before they saw recovering state
        // are sure to have finished (see SOLR-7141 for
        // discussion around current value)
        // TODO since SOLR-11216, we probably won't need this
        try {
          Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }

        // first thing we just try to sync
        if (firstTime) {
          firstTime = false; // only try sync the first time through the loop
          if (log.isInfoEnabled()) {
            log.info(
                "Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]",
                leader.getCoreUrl(),
                recoveringAfterStartup);
          }
          // System.out.println("Attempting to PeerSync from " + leaderUrl
          // + " i am:" + zkController.getNodeName());
          boolean syncSuccess;
          try (PeerSyncWithLeader peerSyncWithLeader =
              new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
            syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
          }
          if (syncSuccess) {
            SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
            // force open a new searcher
            core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
            req.close();
            log.info("PeerSync stage of recovery was successful.");

            // solrcloud_debug
            cloudDebugLog(core, "synced");

            log.info("Replaying updates buffered during PeerSync.");
            replayFuture = replay(core);

            // sync success
            successfulRecovery = true;
            break;
          }

          log.info("PeerSync Recovery was not successful - trying replication.");
        }

        if (isClosed()) {
          log.info("RecoveryStrategy has been closed");
          break;
        }

        log.info("Starting Replication Recovery.");

        try {

          replicate(zkController.getNodeName(), core, leader);

          if (isClosed()) {
            log.info("RecoveryStrategy has been closed");
            break;
          }

          replayFuture = replay(core);

          if (isClosed()) {
            log.info("RecoveryStrategy has been closed");
            break;
          }

          log.info("Replication Recovery was successful.");
          successfulRecovery = true;
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          log.warn("Recovery was interrupted", e);
          close = true;
        } catch (Exception e) {
          log.error("Error while trying to recover", e);
        }

      } catch (Exception e) {
        log.error("Error while trying to recover. core={}", coreName, e);
      } finally {
        if (successfulRecovery) {
          log.info("Registering as Active after recovery.");
          try {
            if (replicaType.replicateFromLeader) {
              zkController.startReplicationFromLeader(coreName, true);
            }
            zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
          } catch (Exception e) {
            log.error("Could not publish as ACTIVE after successful recovery", e);
            successfulRecovery = false;
          }

          if (successfulRecovery) {
            close = true;
            recoveryListener.recovered();
          }
        }
      }

      if (!successfulRecovery) {
        if (waitBetweenRecoveries(core.getName())) break;
      }
    }

    if (log.isInfoEnabled()) {
      log.info(
          "Finished recovery process, successful=[{}] msTimeTaken={}",
          successfulRecovery,
          timer.getTime());
    }
  }