in solr/core/src/java/org/apache/solr/handler/IndexFetcher.java [405:783]
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload)
throws IOException, InterruptedException {
this.clearLocalIndexFirst = false;
boolean cleanupDone = false;
boolean successfulInstall = false;
markReplicationStart();
Directory tmpIndexDir = null;
String tmpIndexDirPath;
Directory indexDir = null;
String indexDirPath;
boolean deleteTmpIdxDir = true;
Path tmpTlogDir = null;
if (!solrCore.getSolrCoreState().getLastReplicateIndexSuccess()) {
// if the last replication was not a success, we force a full replication
// when we are a bit more confident we may want to try a partial replication
// if the error is connection related or something, but we have to be careful
forceReplication = true;
log.info("Last replication failed, so I'll force replication");
}
try {
if (fetchFromLeader) {
assert !solrCore.isClosed() : "Replication should be stopped before closing the core";
Replica replica = getLeaderReplica();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
if (cd.getCoreNodeName().equals(replica.getName())) {
return IndexFetchResult.EXPECTING_NON_LEADER;
}
if (replica.getState() != Replica.State.ACTIVE) {
if (log.isInfoEnabled()) {
log.info(
"Replica {} is leader but it's state is {}, skipping replication",
replica.getName(),
replica.getState());
}
return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
}
if (!solrCore
.getCoreContainer()
.getZkController()
.getClusterState()
.liveNodesContain(replica.getNodeName())) {
if (log.isInfoEnabled()) {
log.info(
"Replica {} is leader but it's not hosted on a live node, skipping replication",
replica.getName());
}
return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
}
if (!replica.getCoreUrl().equals(leaderCoreUrl)) {
setLeaderCoreUrl(replica.getCoreUrl());
log.info("Updated leaderUrl to {}", leaderCoreUrl);
// TODO: Do we need to set forceReplication = true?
} else {
log.debug("leaderUrl didn't change");
}
}
// get the current 'replicateable' index version in the leader
NamedList<?> response;
try {
response = getLatestVersion();
} catch (Exception e) {
final String errorMsg = e.toString();
if (StrUtils.isNotNullOrEmpty(errorMsg) && errorMsg.contains(INTERRUPT_RESPONSE_MESSAGE)) {
log.warn(
"Leader at: {} is not available. Index fetch failed by interrupt. Exception: {}",
leaderCoreUrl,
errorMsg);
return new IndexFetchResult(IndexFetchResult.FAILED_BY_INTERRUPT_MESSAGE, false, e);
} else {
log.warn(
"Leader at: {} is not available. Index fetch failed by exception: {}",
leaderCoreUrl,
errorMsg);
return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
}
}
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
log.info("Leader's generation: {}", latestGeneration);
log.info("Leader's version: {}", latestVersion);
// TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no
// side-car indexes)
IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
if (commit == null) {
// Presumably the IndexWriter hasn't been opened yet, and hence the deletion policy hasn't
// been updated with commit points
RefCounted<SolrIndexSearcher> searcherRefCounted = null;
try {
searcherRefCounted = solrCore.getNewestSearcher(false);
if (searcherRefCounted == null) {
log.warn("No open searcher found - fetch aborted");
return IndexFetchResult.NO_INDEX_COMMIT_EXIST;
}
commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
} finally {
if (searcherRefCounted != null) searcherRefCounted.decref();
}
}
if (log.isInfoEnabled()) {
log.info("Follower's generation: {}", commit.getGeneration());
log.info(
"Follower's version: {}",
IndexDeletionPolicyWrapper.getCommitTimestamp(commit)); // nowarn
}
// Leader's version is 0 and generation is 0 - not open for replication
if (latestVersion == 0L && latestGeneration == 0L) {
log.info("Leader's version is 0 and generation is 0 - not open for replication");
return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
}
if (latestVersion == 0L) {
if (IndexDeletionPolicyWrapper.getCommitTimestamp(commit) != 0L) {
// since we won't get the files for an empty index,
// we just clear ours and commit
log.info("New index in Leader. Deleting mine...");
RefCounted<IndexWriter> iw =
solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(solrCore);
try {
iw.get().deleteAll();
} finally {
iw.decref();
}
assert TestInjection.injectDelayBeforeFollowerCommitRefresh();
if (skipCommitOnLeaderVersionZero) {
openNewSearcherAndUpdateCommitPoint();
} else {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams());
solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
}
}
// there is nothing to be replicated
successfulInstall = true;
log.debug("Nothing to replicate, leader's version is 0");
return IndexFetchResult.LEADER_VERSION_ZERO;
}
// TODO: Should we be comparing timestamps (across machines) here?
if (!forceReplication
&& IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
// leader and follower are already in sync just return
log.info("Follower in sync with leader.");
successfulInstall = true;
return IndexFetchResult.ALREADY_IN_SYNC;
}
log.info("Starting replication process");
// get the list of files first
fetchFileList(latestGeneration);
// this can happen if the commit point is deleted before we fetch the file list.
if (filesToDownload.isEmpty()) {
return IndexFetchResult.PEER_INDEX_COMMIT_DELETED;
}
if (log.isInfoEnabled()) {
log.info("Number of files in latest index in leader: {}", filesToDownload.size());
}
// Create the sync service
fsyncService =
ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("fsyncService"));
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generation of leader is older than that of the follower, it means they are not
// compatible to be copied then a new index directory to be created and all the files need to
// be copied
boolean isFullCopyNeeded =
IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion
|| commit.getGeneration() >= latestGeneration
|| forceReplication;
String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
String tmpIdxDirName = "index." + timestamp;
tmpIndexDirPath = solrCore.getDataDir() + tmpIdxDirName;
tmpIndexDir =
solrCore
.getDirectoryFactory()
.get(
tmpIndexDirPath,
DirContext.DEFAULT,
solrCore.getSolrConfig().indexConfig.lockType);
// cindex dir...
indexDirPath = solrCore.getIndexDir();
indexDir =
solrCore
.getDirectoryFactory()
.get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
try {
// We will compare all the index files from the leader vs the index files on disk to see if
// there is a mismatch in the metadata. If there is a mismatch for the same index file then
// we download the entire index (except when differential copy is applicable) again.
if (!isFullCopyNeeded && isIndexStale(indexDir)) {
isFullCopyNeeded = true;
}
if (!isFullCopyNeeded && !fetchFromLeader) {
// a searcher might be using some flushed but not committed segments
// because of soft commits (which open a searcher on IW's data)
// so we need to close the existing searcher on the last commit
// and wait until we are able to clean up all unused lucene files
if (solrCore.getCoreContainer().isZooKeeperAware()) {
solrCore.closeSearcher();
}
// rollback and reopen index writer and wait until all unused files
// are successfully deleted
solrCore.getUpdateHandler().newIndexWriter(true);
RefCounted<IndexWriter> writer =
solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
try {
IndexWriter indexWriter = writer.get();
int c = 0;
indexWriter.deleteUnusedFiles();
while (hasUnusedFiles(indexDir, commit)) {
indexWriter.deleteUnusedFiles();
log.info(
"Sleeping for 1000ms to wait for unused lucene index files to be delete-able");
Thread.sleep(1000);
c++;
if (c >= 30) {
log.warn(
"IndexFetcher unable to cleanup unused lucene index files so we must do a full copy instead");
isFullCopyNeeded = true;
break;
}
}
if (c > 0) {
log.info(
"IndexFetcher slept for {}ms for unused lucene index files to be delete-able",
c * 1000);
}
} finally {
writer.decref();
}
}
boolean reloadCore = false;
try {
// we have to be careful and do this after we know isFullCopyNeeded won't be flipped
if (!isFullCopyNeeded) {
solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
}
log.info("Starting download (fullCopy={}) to {}", isFullCopyNeeded, tmpIndexDir);
successfulInstall = false;
long bytesDownloaded =
downloadIndexFiles(
isFullCopyNeeded,
indexDir,
tmpIndexDir,
indexDirPath,
tmpIndexDirPath,
latestGeneration);
final long timeTakenSeconds = getReplicationTimeElapsed();
final Long bytesDownloadedPerSecond =
(timeTakenSeconds != 0 ? bytesDownloaded / timeTakenSeconds : null);
log.info(
"Total time taken for download (fullCopy={},bytesDownloaded={}) : {} secs ({} bytes/sec) to {}",
isFullCopyNeeded,
bytesDownloaded,
timeTakenSeconds,
bytesDownloadedPerSecond,
tmpIndexDir);
Collection<Map<String, Object>> modifiedConfFiles =
getModifiedConfFiles(confFilesToDownload);
if (!modifiedConfFiles.isEmpty()) {
reloadCore = true;
downloadConfFiles(confFilesToDownload, latestGeneration);
if (isFullCopyNeeded) {
successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
if (successfulInstall) deleteTmpIdxDir = false;
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
if (!this.clearLocalIndexFirst) { // it was closed earlier
solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
}
// Cleanup all index files not associated with any *named* snapshot.
solrCore.deleteNonSnapshotIndexFiles(indexDirPath);
}
}
log.info("Configuration files are modified, core will be reloaded");
// write to a file time of replication and conf files.
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
}
} else {
terminateAndWaitFsyncService();
if (isFullCopyNeeded) {
successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
if (successfulInstall) deleteTmpIdxDir = false;
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
}
}
} finally {
solrCore.searchEnabled = true;
solrCore.indexEnabled = true;
if (!isFullCopyNeeded) {
solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
}
}
// we must reload the core after we open the IW back up
if (successfulInstall && (reloadCore || forceCoreReload)) {
if (log.isInfoEnabled()) {
log.info("Reloading SolrCore {}", solrCore.getName());
}
reloadCore();
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
log.info("removing old index directory {}", indexDir);
solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
solrCore.getDirectoryFactory().remove(indexDir);
}
}
if (isFullCopyNeeded) {
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
}
openNewSearcherAndUpdateCommitPoint();
}
if (!isFullCopyNeeded && !forceReplication && !successfulInstall) {
cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
cleanupDone = true;
// we try with a full copy of the index
log.warn(
"Replication attempt was not successful - trying a full index replication reloadCore={}",
reloadCore);
successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful();
}
markReplicationStop();
return successfulInstall
? IndexFetchResult.INDEX_FETCH_SUCCESS
: IndexFetchResult.INDEX_FETCH_FAILURE;
} catch (ReplicationHandlerException e) {
log.error("User aborted Replication", e);
return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
} catch (SolrException e) {
throw e;
} catch (InterruptedException e) {
throw (InterruptedException)
(new InterruptedException("Index fetch interrupted").initCause(e));
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
}
} finally {
if (!cleanupDone) {
cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
}
}
}