in solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java [170:591]
public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Object> results)
throws Exception {
log.debug("*** called: {}", message);
String extCollection = message.getStr(CommonParams.NAME);
if (extCollection == null) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "Source collection name must be specified");
}
boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
String collection;
if (followAliases) {
collection =
ccc.getSolrCloudManager().getClusterStateProvider().resolveSimpleAlias(extCollection);
} else {
collection = extCollection;
}
if (!clusterState.hasCollection(collection)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "Source collection name must exist");
}
String target = message.getStr(TARGET);
if (target == null) {
target = collection;
} else {
if (followAliases) {
target = ccc.getSolrCloudManager().getClusterStateProvider().resolveSimpleAlias(target);
}
}
boolean sameTarget = target.equals(collection) || target.equals(extCollection);
boolean removeSource = message.getBool(REMOVE_SOURCE, false);
Cmd command = Cmd.get(message.getStr(COMMAND, Cmd.START.toLower()));
if (command == null) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + message.getStr(COMMAND));
}
Map<String, Object> reindexingState =
getReindexingState(ccc.getSolrCloudManager().getDistribStateManager(), collection);
if (!reindexingState.containsKey(STATE)) {
reindexingState.put(STATE, State.IDLE.toLower());
}
State state = State.get(reindexingState.get(STATE));
if (command == Cmd.ABORT) {
log.info("Abort requested for collection {}, setting the state to ABORTED.", collection);
// check that it's running
if (state != State.RUNNING) {
log.debug(
"Abort requested for collection {} but command is not running: {}", collection, state);
return;
}
setReindexingState(collection, State.ABORTED, null);
reindexingState.put(STATE, "aborting");
results.add(REINDEX_STATUS, reindexingState);
// if needed the cleanup will be performed by the running instance of the command
return;
} else if (command == Cmd.STATUS) {
results.add(REINDEX_STATUS, reindexingState);
return;
}
// command == Cmd.START
// check it's not already running
if (state == State.RUNNING) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"Reindex is already running for collection "
+ collection
+ ". If you are sure this is not the case you can issue &cmd=abort to clean up this state.");
}
DocCollection coll = clusterState.getCollection(collection);
boolean aborted = false;
int batchSize = message.getInt(CommonParams.ROWS, 100);
String query = message.getStr(CommonParams.Q, "*:*");
String fl = message.getStr(CommonParams.FL, "*");
Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
ReplicaCount numReplicas = ReplicaCount.fromMessage(message, coll);
int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, coll.getActiveSlices().size());
DocRouter router = coll.getRouter();
if (router == null) {
router = DocRouter.DEFAULT;
}
String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, coll.getConfigName());
String targetCollection;
int seq = tmpCollectionSeq.getAndIncrement();
if (sameTarget) {
do {
targetCollection = TARGET_COL_PREFIX + extCollection + "_" + seq;
if (!clusterState.hasCollection(targetCollection)) {
break;
}
seq = tmpCollectionSeq.getAndIncrement();
} while (clusterState.hasCollection(targetCollection));
} else {
targetCollection = target;
}
String chkCollection = CHK_COL_PREFIX + extCollection;
String daemonUrl = null;
Replica daemonReplica = null;
Exception exc = null;
boolean createdTarget = false;
try {
// set the running flag
reindexingState.clear();
reindexingState.put("actualSourceCollection", collection);
reindexingState.put("actualTargetCollection", targetCollection);
reindexingState.put("checkpointCollection", chkCollection);
reindexingState.put("inputDocs", getNumberOfDocs(collection));
reindexingState.put(PHASE, "creating target and checkpoint collections");
setReindexingState(collection, State.RUNNING, reindexingState);
// 0. set up target and checkpoint collections
NamedList<Object> cmdResults = new NamedList<>();
ZkNodeProps cmd;
if (clusterState.hasCollection(targetCollection)) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Target collection " + targetCollection + " already exists! Delete it first.");
}
if (clusterState.hasCollection(chkCollection)) {
// delete the checkpoint collection
cmd =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME,
chkCollection);
new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
CollectionHandlingUtils.checkResults(
"deleting old checkpoint collection " + chkCollection, cmdResults, true);
}
if (maybeAbort(collection)) {
aborted = true;
return;
}
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
propMap.put(CommonParams.NAME, targetCollection);
propMap.put(ZkStateReader.NUM_SHARDS_PROP, numShards);
propMap.put(CollectionAdminParams.COLL_CONF, configName);
// init first from the same router
propMap.put("router.name", router.getName());
for (String key : coll.keySet()) {
if (key.startsWith("router.")) {
propMap.put(key, coll.get(key));
}
}
// then apply overrides if present
for (String key : message.keySet()) {
if (key.startsWith("router.")) {
propMap.put(key, message.getStr(key));
} else if (COLLECTION_PARAMS.contains(key)) {
propMap.put(key, message.get(key));
}
}
propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
if (rf != null) {
propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
}
numReplicas.writeProps(propMap);
// create the target collection
cmd = new ZkNodeProps(propMap);
cmdResults = new NamedList<>();
new CreateCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
createdTarget = true;
CollectionHandlingUtils.checkResults(
"creating target collection " + targetCollection, cmdResults, true);
// create the checkpoint collection - use RF=1 and 1 shard
cmd =
new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
CommonParams.NAME, chkCollection,
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.REPLICATION_FACTOR, "1",
CollectionAdminParams.COLL_CONF, "_default",
CommonAdminParams.WAIT_FOR_FINAL_STATE, "true");
cmdResults = new NamedList<>();
new CreateCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
CollectionHandlingUtils.checkResults(
"creating checkpoint collection " + chkCollection, cmdResults, true);
// wait for a while until we see both collections
try {
for (String col : List.of(targetCollection, chkCollection)) {
ccc.getZkStateReader().waitForState(col, 30, TimeUnit.SECONDS, Objects::nonNull);
}
} catch (TimeoutException e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "Could not fully create temporary collection(s)");
}
clusterState = ccc.getSolrCloudManager().getClusterState();
if (maybeAbort(collection)) {
aborted = true;
return;
}
// 1. put the source collection in read-only mode
cmd =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP,
collection,
ZkStateReader.READ_ONLY,
"true");
if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
ccc.getDistributedClusterStateUpdater()
.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
cmd,
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
ccc.offerStateUpdate(cmd);
}
TestInjection.injectReindexLatch();
if (maybeAbort(collection)) {
aborted = true;
return;
}
// 2. copy the documents to target
// Recipe taken from:
// http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
ModifiableSolrParams q = new ModifiableSolrParams();
q.set(CommonParams.QT, "/stream");
q.set("collection", collection);
q.set(
"expr",
"daemon(id=\""
+ targetCollection
+ "\","
+ "terminate=\"true\","
+ "commit("
+ targetCollection
+ ","
+ "update("
+ targetCollection
+ ","
+ "batchSize="
+ batchSize
+ ","
+ "topic("
+ chkCollection
+ ","
+ collection
+ ","
+ "q=\""
+ query
+ "\","
+ "fl=\""
+ fl
+ "\","
+ "id=\"topic_"
+ targetCollection
+ "\","
+
// some of the documents eg. in .system contain large blobs
"rows=\""
+ batchSize
+ "\","
+ "initialCheckpoint=\"0\"))))");
log.debug("- starting copying documents from {} to {}", collection, targetCollection);
SolrResponse rsp;
try {
rsp = new QueryRequest(q).process(ccc.getSolrCloudManager().getSolrClient());
} catch (Exception e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Unable to copy documents from " + collection + " to " + targetCollection,
e);
}
daemonReplica = getReplicaForDaemon(rsp, coll);
if (daemonReplica == null) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Unable to copy documents from "
+ collection
+ " to "
+ targetCollection
+ ": "
+ Utils.toJSONString(rsp));
}
reindexingState.put("daemonUrl", daemonReplica.getCoreUrl());
reindexingState.put("daemonName", targetCollection);
reindexingState.put(PHASE, "copying documents");
setReindexingState(collection, State.RUNNING, reindexingState);
// wait for the daemon to finish
waitForDaemon(targetCollection, daemonReplica, collection, targetCollection, reindexingState);
if (maybeAbort(collection)) {
aborted = true;
return;
}
log.debug("- finished copying from {} to {}", collection, targetCollection);
// fail here or earlier during daemon run
TestInjection.injectReindexFailure();
// 5. if (sameTarget) set up an alias to use targetCollection as the source name
if (sameTarget) {
log.debug("- setting up alias from {} to {}", extCollection, targetCollection);
cmd = new ZkNodeProps(CommonParams.NAME, extCollection, "collections", targetCollection);
cmdResults = new NamedList<>();
new CreateAliasCmd(ccc).call(clusterState, cmd, cmdResults);
CollectionHandlingUtils.checkResults(
"setting up alias " + extCollection + " -> " + targetCollection, cmdResults, true);
reindexingState.put("alias", extCollection + " -> " + targetCollection);
}
reindexingState.remove("daemonUrl");
reindexingState.remove("daemonName");
reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
reindexingState.put(PHASE, "copying done, finalizing");
setReindexingState(collection, State.RUNNING, reindexingState);
if (maybeAbort(collection)) {
aborted = true;
return;
}
// 6. delete the checkpoint collection
log.debug("- deleting {}", chkCollection);
cmd =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME,
chkCollection);
cmdResults = new NamedList<>();
new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
CollectionHandlingUtils.checkResults(
"deleting checkpoint collection " + chkCollection, cmdResults, true);
// 7. optionally delete the source collection
if (removeSource) {
log.debug("- deleting source collection");
cmd =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME,
collection,
FOLLOW_ALIASES,
"false");
cmdResults = new NamedList<>();
new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
CollectionHandlingUtils.checkResults(
"deleting source collection " + collection, cmdResults, true);
} else {
// 8. clear readOnly on source
ZkNodeProps props =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP,
collection,
ZkStateReader.READ_ONLY,
null);
if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
ccc.getDistributedClusterStateUpdater()
.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
props,
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
ccc.offerStateUpdate(props);
}
}
// 9. set FINISHED state on the target and clear the state on the source
ZkNodeProps props =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP,
targetCollection,
REINDEXING_STATE,
State.FINISHED.toLower());
if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
ccc.getDistributedClusterStateUpdater()
.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
props,
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
ccc.offerStateUpdate(props);
}
reindexingState.put(STATE, State.FINISHED.toLower());
reindexingState.put(PHASE, "done");
removeReindexingState(collection);
} catch (Exception e) {
log.warn("Error during reindexing of {}", extCollection, e);
exc = e;
aborted = true;
} finally {
if (aborted) {
cleanup(
collection,
targetCollection,
chkCollection,
daemonReplica,
targetCollection,
createdTarget);
if (exc != null) {
results.add("error", exc.toString());
}
reindexingState.put(STATE, State.ABORTED.toLower());
}
results.add(REINDEX_STATUS, reindexingState);
}
}