in solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java [106:353]
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart)
throws KeeperException, InterruptedException {
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
ActionThrottle lt;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
// shutdown or removed
return;
}
MDCLoggingContext.setCore(core);
lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
}
try {
lt.minimumWaitBetweenActions();
lt.markAttemptingAction();
int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
log.debug(
"Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}",
shardId,
weAreReplacement,
leaderVoteWait);
if (zkController
.getClusterState()
.getCollection(collection)
.getSlice(shardId)
.getNumLeaderReplicas()
> 1) {
// Clear the leader in clusterstate. We only need to worry about this if there is actually
// more than one replica.
MapWriter m =
ew ->
ew.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower())
.put(ZkStateReader.SHARD_ID_PROP, shardId)
.put(ZkStateReader.COLLECTION_PROP, collection);
if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
distributedClusterStateUpdater.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.SliceSetShardLeader,
new ZkNodeProps(m),
zkController.getSolrCloudManager(),
zkStateReader);
} else {
zkController.getOverseer().getStateUpdateQueue().offer(m);
}
}
if (!weAreReplacement) {
waitForReplicasToComeUp(leaderVoteWait);
}
if (isClosed) {
// Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the
// later, we cannot be sure we are still the leader, so we should bail out. The OnReconnect
// handler will re-register the cores and handle a new leadership election.
return;
}
Replica.Type replicaType;
String coreNodeName;
boolean setTermToMax = false;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
return;
}
replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
// should I be leader?
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
if (!waitForEligibleBecomeLeaderAfterTimeout(
zkShardTerms, coreNodeName, leaderVoteWait)) {
rejoinLeaderElection(core);
return;
} else {
// only log an error if this replica win the election
setTermToMax = true;
}
}
if (isClosed) {
return;
}
log.info("I may be the new leader - try and sync");
// we are going to attempt to be the leader
// first cancel any current recovery
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
if (weAreReplacement) {
// wait a moment for any floating updates to finish
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
}
}
PeerSync.PeerSyncResult result = null;
boolean success = false;
try {
result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
success = result.isSuccess();
} catch (Exception e) {
log.error("Exception while trying to sync", e);
result = PeerSync.PeerSyncResult.failure();
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (!success) {
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
}
}
if (!hasRecentUpdates) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway if no one else has any versions either
if (result.getOtherHasVersions().orElse(false)) {
log.info(
"We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
success = false;
} else {
log.info(
"We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
}
}
}
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
if (log.isDebugEnabled()) {
log.debug(
"{} synched {}",
core.getCoreContainer().getZkController().getNodeName(),
searcher.count(new MatchAllDocsQuery()));
}
} finally {
searchHolder.decref();
}
} catch (Exception e) {
log.error("Error in solrcloud_debug block", e);
}
}
if (!success) {
rejoinLeaderElection(core);
return;
}
}
if (!isClosed) {
try {
if (replicaType.replicateFromLeader) {
// stop replicate from old leader
zkController.stopReplicationFromLeader(coreName);
}
if (replicaType == Replica.Type.TLOG) {
if (weAreReplacement) {
try (SolrCore core = cc.getCore(coreName)) {
Future<UpdateLog.RecoveryInfo> future =
core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
if (future != null) {
log.info("Replaying tlog before become new leader");
future.get();
} else {
log.info("New leader does not have old tlog to replay");
}
}
}
}
// in case of leaderVoteWait timeout, a replica with lower term can win the election
if (setTermToMax) {
log.error(
"WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) {}",
"without being up-to-date with the previous leader",
coreNodeName);
zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
}
super.runLeaderProcess(weAreReplacement, 0);
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
publishActiveIfRegisteredAndNotActive(core);
} else {
return;
}
}
if (log.isInfoEnabled()) {
log.info(
"I am the new leader: {} {}", ZkCoreNodeProps.getCoreUrl(leaderProps), shardId);
}
// we made it as leader - send any recovery requests we need to
syncStrategy.requestRecoveries();
} catch (SessionExpiredException e) {
throw new SolrException(
ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
} catch (Exception e) {
log.error("There was a problem trying to register as the leader", e);
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
if (log.isDebugEnabled()) {
log.debug(
"SolrCore not found: {} in {}",
coreName,
CloudUtil.getLoadedCoreNamesAsString(cc));
}
return;
}
core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
// we could not publish ourselves as leader - try and rejoin election
try {
rejoinLeaderElection(core);
} catch (SessionExpiredException exc) {
throw new SolrException(
ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
}
}
} else {
cancelElection();
}
} finally {
MDCLoggingContext.clear();
}
}