IndexFetchResult fetchLatestIndex()

in solr/core/src/java/org/apache/solr/handler/IndexFetcher.java [405:783]


  IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload)
      throws IOException, InterruptedException {

    this.clearLocalIndexFirst = false;
    boolean cleanupDone = false;
    boolean successfulInstall = false;
    markReplicationStart();
    Directory tmpIndexDir = null;
    String tmpIndexDirPath;
    Directory indexDir = null;
    String indexDirPath;
    boolean deleteTmpIdxDir = true;
    Path tmpTlogDir = null;

    if (!solrCore.getSolrCoreState().getLastReplicateIndexSuccess()) {
      // if the last replication was not a success, we force a full replication
      // when we are a bit more confident we may want to try a partial replication
      // if the error is connection related or something, but we have to be careful
      forceReplication = true;
      log.info("Last replication failed, so I'll force replication");
    }

    try {
      if (fetchFromLeader) {
        assert !solrCore.isClosed() : "Replication should be stopped before closing the core";
        Replica replica = getLeaderReplica();
        CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
        if (cd.getCoreNodeName().equals(replica.getName())) {
          return IndexFetchResult.EXPECTING_NON_LEADER;
        }
        if (replica.getState() != Replica.State.ACTIVE) {
          if (log.isInfoEnabled()) {
            log.info(
                "Replica {} is leader but it's state is {}, skipping replication",
                replica.getName(),
                replica.getState());
          }
          return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
        }
        if (!solrCore
            .getCoreContainer()
            .getZkController()
            .getClusterState()
            .liveNodesContain(replica.getNodeName())) {
          if (log.isInfoEnabled()) {
            log.info(
                "Replica {} is leader but it's not hosted on a live node, skipping replication",
                replica.getName());
          }
          return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
        }
        if (!replica.getCoreUrl().equals(leaderCoreUrl)) {
          setLeaderCoreUrl(replica.getCoreUrl());
          log.info("Updated leaderUrl to {}", leaderCoreUrl);
          // TODO: Do we need to set forceReplication = true?
        } else {
          log.debug("leaderUrl didn't change");
        }
      }
      // get the current 'replicateable' index version in the leader
      NamedList<?> response;
      try {
        response = getLatestVersion();
      } catch (Exception e) {
        final String errorMsg = e.toString();
        if (StrUtils.isNotNullOrEmpty(errorMsg) && errorMsg.contains(INTERRUPT_RESPONSE_MESSAGE)) {
          log.warn(
              "Leader at: {} is not available. Index fetch failed by interrupt. Exception: {}",
              leaderCoreUrl,
              errorMsg);
          return new IndexFetchResult(IndexFetchResult.FAILED_BY_INTERRUPT_MESSAGE, false, e);
        } else {
          log.warn(
              "Leader at: {} is not available. Index fetch failed by exception: {}",
              leaderCoreUrl,
              errorMsg);
          return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
        }
      }

      long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
      long latestGeneration = (Long) response.get(GENERATION);

      log.info("Leader's generation: {}", latestGeneration);
      log.info("Leader's version: {}", latestVersion);

      // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no
      // side-car indexes)
      IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
      if (commit == null) {
        // Presumably the IndexWriter hasn't been opened yet, and hence the deletion policy hasn't
        // been updated with commit points
        RefCounted<SolrIndexSearcher> searcherRefCounted = null;
        try {
          searcherRefCounted = solrCore.getNewestSearcher(false);
          if (searcherRefCounted == null) {
            log.warn("No open searcher found - fetch aborted");
            return IndexFetchResult.NO_INDEX_COMMIT_EXIST;
          }
          commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
        } finally {
          if (searcherRefCounted != null) searcherRefCounted.decref();
        }
      }

      if (log.isInfoEnabled()) {
        log.info("Follower's generation: {}", commit.getGeneration());
        log.info(
            "Follower's version: {}",
            IndexDeletionPolicyWrapper.getCommitTimestamp(commit)); // nowarn
      }

      // Leader's version is 0 and generation is 0 -  not open for replication
      if (latestVersion == 0L && latestGeneration == 0L) {
        log.info("Leader's version is 0 and generation is 0 -  not open for replication");
        return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
      }

      if (latestVersion == 0L) {
        if (IndexDeletionPolicyWrapper.getCommitTimestamp(commit) != 0L) {
          // since we won't get the files for an empty index,
          // we just clear ours and commit
          log.info("New index in Leader. Deleting mine...");
          RefCounted<IndexWriter> iw =
              solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(solrCore);
          try {
            iw.get().deleteAll();
          } finally {
            iw.decref();
          }
          assert TestInjection.injectDelayBeforeFollowerCommitRefresh();
          if (skipCommitOnLeaderVersionZero) {
            openNewSearcherAndUpdateCommitPoint();
          } else {
            SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams());
            solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
          }
        }

        // there is nothing to be replicated
        successfulInstall = true;
        log.debug("Nothing to replicate, leader's version is 0");
        return IndexFetchResult.LEADER_VERSION_ZERO;
      }

      // TODO: Should we be comparing timestamps (across machines) here?
      if (!forceReplication
          && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
        // leader and follower are already in sync just return
        log.info("Follower in sync with leader.");
        successfulInstall = true;
        return IndexFetchResult.ALREADY_IN_SYNC;
      }
      log.info("Starting replication process");
      // get the list of files first
      fetchFileList(latestGeneration);
      // this can happen if the commit point is deleted before we fetch the file list.
      if (filesToDownload.isEmpty()) {
        return IndexFetchResult.PEER_INDEX_COMMIT_DELETED;
      }
      if (log.isInfoEnabled()) {
        log.info("Number of files in latest index in leader: {}", filesToDownload.size());
      }

      // Create the sync service
      fsyncService =
          ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("fsyncService"));
      // use a synchronized list because the list is read by other threads (to show details)
      filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
      // if the generation of leader is older than that of the follower, it means they are not
      // compatible to be copied then a new index directory to be created and all the files need to
      // be copied
      boolean isFullCopyNeeded =
          IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion
              || commit.getGeneration() >= latestGeneration
              || forceReplication;

      String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
      String tmpIdxDirName = "index." + timestamp;
      tmpIndexDirPath = solrCore.getDataDir() + tmpIdxDirName;

      tmpIndexDir =
          solrCore
              .getDirectoryFactory()
              .get(
                  tmpIndexDirPath,
                  DirContext.DEFAULT,
                  solrCore.getSolrConfig().indexConfig.lockType);

      // cindex dir...
      indexDirPath = solrCore.getIndexDir();
      indexDir =
          solrCore
              .getDirectoryFactory()
              .get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);

      try {

        // We will compare all the index files from the leader vs the index files on disk to see if
        // there is a mismatch in the metadata. If there is a mismatch for the same index file then
        // we download the entire index (except when differential copy is applicable) again.
        if (!isFullCopyNeeded && isIndexStale(indexDir)) {
          isFullCopyNeeded = true;
        }

        if (!isFullCopyNeeded && !fetchFromLeader) {
          // a searcher might be using some flushed but not committed segments
          // because of soft commits (which open a searcher on IW's data)
          // so we need to close the existing searcher on the last commit
          // and wait until we are able to clean up all unused lucene files
          if (solrCore.getCoreContainer().isZooKeeperAware()) {
            solrCore.closeSearcher();
          }

          // rollback and reopen index writer and wait until all unused files
          // are successfully deleted
          solrCore.getUpdateHandler().newIndexWriter(true);
          RefCounted<IndexWriter> writer =
              solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
          try {
            IndexWriter indexWriter = writer.get();
            int c = 0;
            indexWriter.deleteUnusedFiles();
            while (hasUnusedFiles(indexDir, commit)) {
              indexWriter.deleteUnusedFiles();
              log.info(
                  "Sleeping for 1000ms to wait for unused lucene index files to be delete-able");
              Thread.sleep(1000);
              c++;
              if (c >= 30) {
                log.warn(
                    "IndexFetcher unable to cleanup unused lucene index files so we must do a full copy instead");
                isFullCopyNeeded = true;
                break;
              }
            }
            if (c > 0) {
              log.info(
                  "IndexFetcher slept for {}ms for unused lucene index files to be delete-able",
                  c * 1000);
            }
          } finally {
            writer.decref();
          }
        }
        boolean reloadCore = false;

        try {
          // we have to be careful and do this after we know isFullCopyNeeded won't be flipped
          if (!isFullCopyNeeded) {
            solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
          }

          log.info("Starting download (fullCopy={}) to {}", isFullCopyNeeded, tmpIndexDir);
          successfulInstall = false;

          long bytesDownloaded =
              downloadIndexFiles(
                  isFullCopyNeeded,
                  indexDir,
                  tmpIndexDir,
                  indexDirPath,
                  tmpIndexDirPath,
                  latestGeneration);
          final long timeTakenSeconds = getReplicationTimeElapsed();
          final Long bytesDownloadedPerSecond =
              (timeTakenSeconds != 0 ? bytesDownloaded / timeTakenSeconds : null);
          log.info(
              "Total time taken for download (fullCopy={},bytesDownloaded={}) : {} secs ({} bytes/sec) to {}",
              isFullCopyNeeded,
              bytesDownloaded,
              timeTakenSeconds,
              bytesDownloadedPerSecond,
              tmpIndexDir);

          Collection<Map<String, Object>> modifiedConfFiles =
              getModifiedConfFiles(confFilesToDownload);
          if (!modifiedConfFiles.isEmpty()) {
            reloadCore = true;
            downloadConfFiles(confFilesToDownload, latestGeneration);
            if (isFullCopyNeeded) {
              successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
              if (successfulInstall) deleteTmpIdxDir = false;
            } else {
              successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
            }
            if (successfulInstall) {
              if (isFullCopyNeeded) {
                // let the system know we are changing dir's and the old one
                // may be closed
                if (indexDir != null) {
                  if (!this.clearLocalIndexFirst) { // it was closed earlier
                    solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
                  }
                  // Cleanup all index files not associated with any *named* snapshot.
                  solrCore.deleteNonSnapshotIndexFiles(indexDirPath);
                }
              }

              log.info("Configuration files are modified, core will be reloaded");
              // write to a file time of replication and conf files.
              logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
            }
          } else {
            terminateAndWaitFsyncService();
            if (isFullCopyNeeded) {
              successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
              if (successfulInstall) deleteTmpIdxDir = false;
            } else {
              successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
            }
            if (successfulInstall) {
              logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
            }
          }
        } finally {
          solrCore.searchEnabled = true;
          solrCore.indexEnabled = true;
          if (!isFullCopyNeeded) {
            solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
          }
        }

        // we must reload the core after we open the IW back up
        if (successfulInstall && (reloadCore || forceCoreReload)) {
          if (log.isInfoEnabled()) {
            log.info("Reloading SolrCore {}", solrCore.getName());
          }
          reloadCore();
        }

        if (successfulInstall) {
          if (isFullCopyNeeded) {
            // let the system know we are changing dir's and the old one
            // may be closed
            if (indexDir != null) {
              log.info("removing old index directory {}", indexDir);
              solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
              solrCore.getDirectoryFactory().remove(indexDir);
            }
          }
          if (isFullCopyNeeded) {
            solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
          }

          openNewSearcherAndUpdateCommitPoint();
        }

        if (!isFullCopyNeeded && !forceReplication && !successfulInstall) {
          cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
          cleanupDone = true;
          // we try with a full copy of the index
          log.warn(
              "Replication attempt was not successful - trying a full index replication reloadCore={}",
              reloadCore);
          successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful();
        }

        markReplicationStop();
        return successfulInstall
            ? IndexFetchResult.INDEX_FETCH_SUCCESS
            : IndexFetchResult.INDEX_FETCH_FAILURE;
      } catch (ReplicationHandlerException e) {
        log.error("User aborted Replication", e);
        return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
      } catch (SolrException e) {
        throw e;
      } catch (InterruptedException e) {
        throw (InterruptedException)
            (new InterruptedException("Index fetch interrupted").initCause(e));
      } catch (Exception e) {
        throw new SolrException(ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
      }
    } finally {
      if (!cleanupDone) {
        cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
      }
    }
  }