in solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java [107:488]
public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Object> results)
throws Exception {
if (ccc.getZkStateReader().aliasesManager != null) { // not a mock ZkStateReader
ccc.getZkStateReader().aliasesManager.update();
}
final Aliases aliases = ccc.getZkStateReader().getAliases();
final String collectionName = message.getStr(NAME);
final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
final String alias = message.getStr(ALIAS, collectionName);
log.info("Create collection {}", collectionName);
boolean prsDefault = EnvUtils.getPropertyAsBool(PRS_DEFAULT_PROP, false);
final boolean isPRS = message.getBool(CollectionStateProps.PER_REPLICA_STATE, prsDefault);
if (log.isInfoEnabled()) {
log.info(
"solr.prs.default : {} and collection prs : {}, isPRS : {}",
System.getProperty("solr.prs.default", null),
message.getStr(CollectionStateProps.PER_REPLICA_STATE),
isPRS);
}
if (clusterState.hasCollection(collectionName)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
}
if (aliases.hasAlias(collectionName)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"collection alias already exists: " + collectionName);
}
String configName = getConfigName(collectionName, message);
if (configName == null) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"No config set found to associate with the collection.");
}
CollectionHandlingUtils.validateConfigOrThrowSolrException(
ccc.getCoreContainer().getConfigSetService(), configName);
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
// fail fast if parameters are wrong or incomplete
List<String> shardNames = populateShardNames(message, router);
ReplicaCount numReplicas = getNumReplicas(message);
DocCollection newColl = null;
final String collectionPath = DocCollection.getCollectionPath(collectionName);
try {
final String async = message.getStr(ASYNC);
ZkStateReader zkStateReader = ccc.getZkStateReader();
message.getProperties().put(COLL_CONF, configName);
Map<String, String> collectionParams = new HashMap<>();
Map<String, Object> collectionProps = message.getProperties();
for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
String propName = entry.getKey();
if (propName.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
collectionParams.put(
propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()),
(String) entry.getValue());
}
}
createCollectionZkNode(
ccc.getSolrCloudManager().getDistribStateManager(),
collectionName,
collectionParams,
ccc.getCoreContainer().getConfigSetService());
// Note that in code below there are two main execution paths: Overseer based cluster state
// updates and distributed cluster state updates (look for isDistributedStateUpdate()
// conditions).
//
// PerReplicaStates (PRS) collections follow a hybrid approach. Even when the cluster is
// Overseer cluster state update based, these collections are created locally then the cluster
// state updater is notified (look for usage of RefreshCollectionMessage). This explains why
// PRS collections have less diverging execution paths between distributed or Overseer based
// cluster state updates.
if (isPRS) {
// In case of a PRS collection, create the collection structure directly instead of
// resubmitting to the overseer queue.
// TODO: Consider doing this for all collections, not just the PRS collections.
// TODO comment above achieved by switching the cluster to distributed state updates
// This code directly updates Zookeeper by creating the collection state.json. It is
// compatible with both distributed cluster state updates and Overseer based cluster state
// updates.
// TODO: Consider doing this for all collections, not just the PRS collections.
ZkWriteCommand command =
new ClusterStateMutator(ccc.getSolrCloudManager())
.createCollection(clusterState, message);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
ccc.getZkStateReader()
.getZkClient()
.create(collectionPath, data, CreateMode.PERSISTENT, true);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
} else {
if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
// The message has been crafted by CollectionsHandler.CollectionOperation.CREATE_OP and
// defines the QUEUE_OPERATION to be CollectionParams.CollectionAction.CREATE.
ccc.getDistributedClusterStateUpdater()
.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.ClusterCreateCollection,
message,
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
ccc.offerStateUpdate(Utils.toJSON(message));
}
// wait for a while until we see the collection
try {
newColl =
zkStateReader.waitForState(collectionName, 30, TimeUnit.SECONDS, Objects::nonNull);
} catch (TimeoutException e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Could not fully create collection: " + collectionName,
e);
}
// refresh cluster state (value read below comes from Zookeeper watch firing following the
// update done previously, be it by Overseer or by this thread when updates are distributed)
clusterState = ccc.getSolrCloudManager().getClusterState();
}
final List<ReplicaPosition> replicaPositions;
try {
replicaPositions =
buildReplicaPositions(
ccc.getCoreContainer(),
ccc.getSolrCloudManager(),
clusterState,
message,
shardNames,
numReplicas);
} catch (Assign.AssignmentException e) {
ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
new DeleteCollectionCmd(ccc).call(clusterState, deleteMessage, results);
// unwrap the exception
throw new SolrException(ErrorCode.BAD_REQUEST, e.getMessage(), e.getCause());
}
if (replicaPositions.isEmpty()) {
log.debug("Finished create command for collection: {}", collectionName);
return;
}
final ShardRequestTracker shardRequestTracker =
CollectionHandlingUtils.asyncRequestTracker(async, ccc);
if (log.isDebugEnabled()) {
log.debug(
formatString(
"Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
collectionName, shardNames, message));
}
Map<String, ShardRequest> coresToCreate = new LinkedHashMap<>();
ShardHandler shardHandler = ccc.newShardHandler();
final DistributedClusterStateUpdater.StateChangeRecorder scr;
// PRS collections update Zookeeper directly, so even if we run in distributed state update,
// there's nothing to update in state.json for such collection in the loop over replica
// positions below.
if (!isPRS && ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
// The collection got created. Now we're adding replicas (and will update ZK only once when
// done adding).
scr =
ccc.getDistributedClusterStateUpdater()
.createStateChangeRecorder(collectionName, false);
;
} else {
scr = null;
}
for (ReplicaPosition replicaPosition : replicaPositions) {
String nodeName = replicaPosition.node;
String coreName =
Assign.buildSolrCoreName(
ccc.getSolrCloudManager().getDistribStateManager(),
collectionName,
replicaPosition.shard,
replicaPosition.type);
if (log.isDebugEnabled()) {
log.debug(
formatString(
"Creating core {0} as part of shard {1} of collection {2} on {3}",
coreName, replicaPosition.shard, collectionName, nodeName));
}
String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
// create the replica in the collection's state.json in ZK prior to creating the core.
// Otherwise the core creation fails
ZkNodeProps props =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
ADDREPLICA.toString(),
ZkStateReader.COLLECTION_PROP,
collectionName,
ZkStateReader.SHARD_ID_PROP,
replicaPosition.shard,
ZkStateReader.CORE_NAME_PROP,
coreName,
ZkStateReader.STATE_PROP,
Replica.State.DOWN.toString(),
ZkStateReader.NODE_NAME_PROP,
nodeName,
ZkStateReader.BASE_URL_PROP,
baseUrl,
ZkStateReader.REPLICA_TYPE,
replicaPosition.type.name(),
CommonAdminParams.WAIT_FOR_FINAL_STATE,
Boolean.toString(waitForFinalState));
if (isPRS) {
// In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
// to the overseer queue.
// TODO: Consider doing this for all collections, not just the PRS collections.
// TODO: consider doing this once after the loop for all replicas rather than writing
// state.json repeatedly
// This PRS specific code is compatible with both Overseer and distributed cluster state
// update strategies
ZkWriteCommand command =
new SliceMutator(ccc.getSolrCloudManager()).addReplica(clusterState, props);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
} else {
if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
scr.record(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props);
} else {
ccc.offerStateUpdate(Utils.toJSON(props));
}
}
// Need to create new params for each request
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, coreName);
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, replicaPosition.shard);
params.set(ZkStateReader.NUM_SHARDS_PROP, shardNames.size());
params.set(CoreAdminParams.NEW_COLLECTION, "true");
params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name());
if (async != null) {
String coreAdminAsyncId = async + Math.abs(System.nanoTime());
params.add(ASYNC, coreAdminAsyncId);
shardRequestTracker.track(nodeName, coreAdminAsyncId);
}
CollectionHandlingUtils.addPropertyParams(message, params);
ShardRequest sreq = new ShardRequest();
sreq.nodeName = nodeName;
params.set("qt", ccc.getAdminPath());
sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
sreq.shards = new String[] {baseUrl};
sreq.actualShards = sreq.shards;
sreq.params = params;
coresToCreate.put(coreName, sreq);
}
// Update the state.json for PRS collection in a single operation
if (isPRS) {
byte[] data =
Utils.toJSON(
Collections.singletonMap(
collectionName, clusterState.getCollection(collectionName)));
zkStateReader.getZkClient().setData(collectionPath, data, true);
}
// Distributed updates don't need to do anything for PRS collections that wrote state.json
// directly. For non PRS collections, distributed updates have to be executed if that's how
// the cluster is configured
if (!isPRS && ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
// Add the replicas to the collection state (all at once after the loop above)
scr.executeStateUpdates(ccc.getSolrCloudManager(), ccc.getZkStateReader());
}
final Map<String, Replica> replicas;
if (isPRS) {
replicas = new ConcurrentHashMap<>();
// Only the elements that were asked for...
newColl.getSlices().stream()
.flatMap(slice -> slice.getReplicas().stream())
.filter(r -> coresToCreate.containsKey(r.getCoreName()))
.forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
} else {
// wait for all replica entries to be created and visible in local cluster state (updated by
// ZK watches)
replicas =
CollectionHandlingUtils.waitToSeeReplicasInState(
ccc.getZkStateReader(),
ccc.getSolrCloudManager().getTimeSource(),
collectionName,
coresToCreate.keySet());
}
for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
ShardRequest sreq = e.getValue();
sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
}
shardRequestTracker.processResponses(
results, shardHandler, false, null, Collections.emptySet());
boolean failure =
results.get("failure") != null
&& ((SimpleOrderedMap<?>) results.get("failure")).size() > 0;
if (isPRS) {
TimeOut timeout =
new TimeOut(
Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120),
TimeUnit.SECONDS,
ccc.getSolrCloudManager().getTimeSource()); // could be a big cluster
PerReplicaStates prs =
PerReplicaStatesOps.fetch(collectionPath, ccc.getZkStateReader().getZkClient(), null);
while (!timeout.hasTimedOut()) {
if (prs.allActive()) break;
Thread.sleep(100);
prs =
PerReplicaStatesOps.fetch(collectionPath, ccc.getZkStateReader().getZkClient(), null);
}
if (prs.allActive()) {
// we have successfully found all replicas to be ACTIVE
} else {
failure = true;
}
}
if (failure) {
// Let's cleanup as we hit an exception
// We shouldn't be passing 'results' here for the cleanup as the response would then contain
// 'success' element, which may be interpreted by the user as a positive ack
CollectionHandlingUtils.cleanupCollection(collectionName, new NamedList<>(), ccc);
log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
throw new SolrException(
ErrorCode.BAD_REQUEST,
"Underlying core creation failed while creating collection: " + collectionName);
} else {
ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
log.debug("Finished create command on all shards for collection: {}", collectionName);
// Emit a warning about production use of data driven functionality
// Note: isAutoGeneratedConfigSet is always a clone of the _default configset
boolean defaultConfigSetUsed =
message.getStr(COLL_CONF) == null
|| message.getStr(COLL_CONF).equals(DEFAULT_CONFIGSET_NAME)
|| ConfigSetsHandler.isAutoGeneratedConfigSet(message.getStr(COLL_CONF));
if (defaultConfigSetUsed) {
results.add(
"warning",
"Using _default configset. Data driven schema functionality"
+ " is enabled by default, which is NOT RECOMMENDED for production use. To turn it off:"
+ " curl http://{host:port}/solr/"
+ collectionName
+ "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'");
}
}
// create an alias pointing to the new collection, if different from the collectionName
if (!alias.equals(collectionName)) {
ccc.getZkStateReader()
.aliasesManager
.applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(alias, collectionName));
}
} catch (SolrException ex) {
throw ex;
} catch (Exception ex) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
}
}