in solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java [65:246]
static boolean migrateReplicas(
CollectionCommandContext ccc,
Map<Replica, String> movements,
boolean parallel,
boolean waitForFinalState,
int timeout,
String asyncId,
NamedList<Object> results)
throws IOException, InterruptedException, KeeperException {
// how many leaders are we moving? for these replicas we have to make sure that either:
// * another existing replica can become a leader, or
// * we wait until the newly created replica completes recovery (and can become the new leader)
// If waitForFinalState=true we wait for all replicas
int numLeaders = 0;
for (Replica replica : movements.keySet()) {
if (replica.isLeader() || waitForFinalState) {
numLeaders++;
}
}
// map of collectionName_coreNodeName to watchers
Map<String, CollectionStateWatcher> watchers = new HashMap<>();
List<ZkNodeProps> createdReplicas = new ArrayList<>();
AtomicBoolean anyOneFailed = new AtomicBoolean(false);
SolrCloseableLatch countDownLatch =
new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());
SolrCloseableLatch replicasToRecover =
new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
ClusterState clusterState = ccc.getZkStateReader().getClusterState();
for (Map.Entry<Replica, String> movement : movements.entrySet()) {
Replica sourceReplica = movement.getKey();
String targetNode = movement.getValue();
String sourceCollection = sourceReplica.getCollection();
if (log.isInfoEnabled()) {
log.info(
"Going to create replica for collection={} shard={} on node={}",
sourceCollection,
sourceReplica.getShard(),
targetNode);
}
ZkNodeProps msg =
sourceReplica
.toFullProps()
.plus("parallel", String.valueOf(parallel))
.plus(CoreAdminParams.NODE, targetNode);
if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
NamedList<Object> nl = new NamedList<>();
final ZkNodeProps addedReplica =
new AddReplicaCmd(ccc)
.addReplica(
clusterState,
msg,
nl,
() -> {
countDownLatch.countDown();
if (nl.get("failure") != null) {
String errorString =
String.format(
Locale.ROOT,
"Failed to create replica for collection=%s shard=%s on node=%s",
sourceCollection,
sourceReplica.getShard(),
targetNode);
log.warn(errorString);
// one replica creation failed. Make the best attempt to
// delete all the replicas created so far in the target
// and exit
synchronized (results) {
results.add("failure", errorString);
anyOneFailed.set(true);
}
} else {
if (log.isDebugEnabled()) {
log.debug(
"Successfully created replica for collection={} shard={} on node={}",
sourceCollection,
sourceReplica.getShard(),
targetNode);
}
}
})
.get(0);
if (addedReplica != null) {
createdReplicas.add(addedReplica);
if (sourceReplica.isLeader() || waitForFinalState) {
String shardName = sourceReplica.getShard();
String replicaName = sourceReplica.getName();
String key = sourceCollection + "_" + replicaName;
CollectionStateWatcher watcher;
if (waitForFinalState) {
watcher =
new ActiveReplicaWatcher(
sourceCollection,
null,
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
replicasToRecover);
} else {
watcher =
new LeaderRecoveryWatcher(
sourceCollection,
shardName,
replicaName,
addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
replicasToRecover);
}
watchers.put(key, watcher);
log.debug("--- adding {}, {}", key, watcher);
ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, watcher);
} else {
log.debug("--- not waiting for {}", addedReplica);
}
}
}
log.debug("Waiting for replicas to be added");
if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
log.info("Timed out waiting for replicas to be added");
anyOneFailed.set(true);
} else {
log.debug("Finished waiting for replicas to be added");
}
// now wait for leader replicas to recover
log.debug("Waiting for {} leader replicas to recover", numLeaders);
if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
if (log.isInfoEnabled()) {
log.info(
"Timed out waiting for {} leader replicas to recover", replicasToRecover.getCount());
}
anyOneFailed.set(true);
} else {
log.debug("Finished waiting for leader replicas to recover");
}
// remove the watchers, we're done either way
for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
ccc.getZkStateReader().removeCollectionStateWatcher(e.getKey(), e.getValue());
}
if (anyOneFailed.get()) {
log.info("Failed to create some replicas. Cleaning up all newly created replicas.");
SolrCloseableLatch cleanupLatch =
new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn());
for (ZkNodeProps createdReplica : createdReplicas) {
NamedList<Object> deleteResult = new NamedList<>();
try {
new DeleteReplicaCmd(ccc)
.deleteReplica(
ccc.getZkStateReader().getClusterState(),
createdReplica.plus("parallel", "true"),
deleteResult,
() -> {
cleanupLatch.countDown();
if (deleteResult.get("failure") != null) {
synchronized (results) {
results.add(
"failure",
"Could not cleanup, because of : " + deleteResult.get("failure"));
}
}
});
} catch (KeeperException e) {
cleanupLatch.countDown();
log.warn("Error deleting replica ", e);
} catch (Exception e) {
log.warn("Error deleting replica ", e);
cleanupLatch.countDown();
throw e;
}
}
cleanupLatch.await(5, TimeUnit.MINUTES);
return false;
}
// we have reached this far, meaning all replicas should have been recreated.
// now cleanup the original replicas
return cleanupReplicas(
results, ccc.getZkStateReader().getClusterState(), movements.keySet(), ccc, asyncId);
}