in solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java [517:772]
public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
final RTimer timer = new RTimer();
boolean successfulRecovery = false;
UpdateLog ulog;
ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
log.error("No UpdateLog found - cannot recover.");
recoveryFailed(zkController, this.coreDescriptor);
return;
}
// we temporary ignore peersync for tlog replicas
boolean firstTime = replicaType != Replica.Type.TLOG;
List<Long> recentVersions;
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
} catch (Exception e) {
log.error("Corrupt tlog - ignoring.", e);
recentVersions = new ArrayList<>(0);
}
List<Long> startingVersions = ulog.getStartingVersions();
if (startingVersions != null && recoveringAfterStartup) {
try {
int oldIdx = 0; // index of the start of the old list in the current list
long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
for (; oldIdx < recentVersions.size(); oldIdx++) {
if (recentVersions.get(oldIdx) == firstStartingVersion) break;
}
if (oldIdx > 0) {
log.info("Found new versions added after startup: num=[{}]", oldIdx);
if (log.isInfoEnabled()) {
log.info(
"currentVersions size={} range=[{} to {}]",
recentVersions.size(),
recentVersions.get(0),
recentVersions.get(recentVersions.size() - 1));
}
}
if (startingVersions.isEmpty()) {
log.info("startupVersions is empty");
} else {
if (log.isInfoEnabled()) {
log.info(
"startupVersions size={} range=[{} to {}]",
startingVersions.size(),
startingVersions.get(0),
startingVersions.get(startingVersions.size() - 1));
}
}
} catch (Exception e) {
log.error("Error getting recent versions.", e);
recentVersions = new ArrayList<>(0);
}
}
if (recoveringAfterStartup) {
// if we're recovering after startup (i.e. we have been down), then we need to know what the
// last versions were when we went down. We may have received updates since then.
recentVersions = startingVersions;
try {
if (ulog.existOldBufferLog()) {
// this means we were previously doing a full index replication that probably didn't
// complete and buffering updates in the meantime.
log.info(
"Looks like a previous replication recovery did not complete - skipping peer sync.");
firstTime = false; // skip peersync
}
} catch (Exception e) {
log.error("Error trying to get ulog starting operation.", e);
firstTime = false; // skip peersync
}
}
if (replicaType.replicateFromLeader) {
zkController.stopReplicationFromLeader(coreName);
}
final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
Future<RecoveryInfo> replayFuture = null;
// don't use interruption or it will close channels though
while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) {
try {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
final Replica leader = pingLeader(ourUrl, this.coreDescriptor, true);
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
}
boolean isLeader = leader.getCoreUrl().equals(ourUrl);
if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
if (cloudDesc.isLeader()) {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader!");
log.info("Finished recovery process.");
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
return;
}
log.info("Begin buffering updates. core=[{}]", coreName);
// recalling buffer updates will drop the old buffer tlog
ulog.bufferUpdates();
if (log.isInfoEnabled()) {
log.info(
"Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]",
core.getName(),
leader.getCoreUrl(),
ourUrl);
}
zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
final Slice slice =
zkStateReader
.getClusterState()
.getCollection(cloudDesc.getCollectionName())
.getSlice(cloudDesc.getShardId());
cancelPrepRecoveryCmd();
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
}
sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
}
// we wait a bit so that any updates on the leader
// that started before they saw recovering state
// are sure to have finished (see SOLR-7141 for
// discussion around current value)
// TODO since SOLR-11216, we probably won't need this
try {
Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
if (log.isInfoEnabled()) {
log.info(
"Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]",
leader.getCoreUrl(),
recoveringAfterStartup);
}
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
boolean syncSuccess;
try (PeerSyncWithLeader peerSyncWithLeader =
new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
}
if (syncSuccess) {
SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
// force open a new searcher
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
req.close();
log.info("PeerSync stage of recovery was successful.");
// solrcloud_debug
cloudDebugLog(core, "synced");
log.info("Replaying updates buffered during PeerSync.");
replayFuture = replay(core);
// sync success
successfulRecovery = true;
break;
}
log.info("PeerSync Recovery was not successful - trying replication.");
}
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
}
log.info("Starting Replication Recovery.");
try {
replicate(zkController.getNodeName(), core, leader);
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
}
replayFuture = replay(core);
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
}
log.info("Replication Recovery was successful.");
successfulRecovery = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted", e);
close = true;
} catch (Exception e) {
log.error("Error while trying to recover", e);
}
} catch (Exception e) {
log.error("Error while trying to recover. core={}", coreName, e);
} finally {
if (successfulRecovery) {
log.info("Registering as Active after recovery.");
try {
if (replicaType.replicateFromLeader) {
zkController.startReplicationFromLeader(coreName, true);
}
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
} catch (Exception e) {
log.error("Could not publish as ACTIVE after successful recovery", e);
successfulRecovery = false;
}
if (successfulRecovery) {
close = true;
recoveryListener.recovered();
}
}
}
if (!successfulRecovery) {
if (waitBetweenRecoveries(core.getName())) break;
}
}
if (log.isInfoEnabled()) {
log.info(
"Finished recovery process, successful=[{}] msTimeTaken={}",
successfulRecovery,
timer.getTime());
}
}