public void call()

in solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java [107:488]


  public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Object> results)
      throws Exception {
    if (ccc.getZkStateReader().aliasesManager != null) { // not a mock ZkStateReader
      ccc.getZkStateReader().aliasesManager.update();
    }
    final Aliases aliases = ccc.getZkStateReader().getAliases();
    final String collectionName = message.getStr(NAME);
    final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
    final String alias = message.getStr(ALIAS, collectionName);
    log.info("Create collection {}", collectionName);
    boolean prsDefault = EnvUtils.getPropertyAsBool(PRS_DEFAULT_PROP, false);
    final boolean isPRS = message.getBool(CollectionStateProps.PER_REPLICA_STATE, prsDefault);
    if (log.isInfoEnabled()) {
      log.info(
          "solr.prs.default : {} and collection prs : {}, isPRS : {}",
          System.getProperty("solr.prs.default", null),
          message.getStr(CollectionStateProps.PER_REPLICA_STATE),
          isPRS);
    }

    if (clusterState.hasCollection(collectionName)) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
    }
    if (aliases.hasAlias(collectionName)) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST,
          "collection alias already exists: " + collectionName);
    }

    String configName = getConfigName(collectionName, message);
    if (configName == null) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST,
          "No config set found to associate with the collection.");
    }

    CollectionHandlingUtils.validateConfigOrThrowSolrException(
        ccc.getCoreContainer().getConfigSetService(), configName);

    String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);

    // fail fast if parameters are wrong or incomplete
    List<String> shardNames = populateShardNames(message, router);
    ReplicaCount numReplicas = getNumReplicas(message);

    DocCollection newColl = null;
    final String collectionPath = DocCollection.getCollectionPath(collectionName);

    try {

      final String async = message.getStr(ASYNC);

      ZkStateReader zkStateReader = ccc.getZkStateReader();
      message.getProperties().put(COLL_CONF, configName);

      Map<String, String> collectionParams = new HashMap<>();
      Map<String, Object> collectionProps = message.getProperties();
      for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
        String propName = entry.getKey();
        if (propName.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
          collectionParams.put(
              propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()),
              (String) entry.getValue());
        }
      }

      createCollectionZkNode(
          ccc.getSolrCloudManager().getDistribStateManager(),
          collectionName,
          collectionParams,
          ccc.getCoreContainer().getConfigSetService());

      // Note that in code below there are two main execution paths: Overseer based cluster state
      // updates and distributed cluster state updates (look for isDistributedStateUpdate()
      // conditions).
      //
      // PerReplicaStates (PRS) collections follow a hybrid approach. Even when the cluster is
      // Overseer cluster state update based, these collections are created locally then the cluster
      // state updater is notified (look for usage of RefreshCollectionMessage). This explains why
      // PRS collections have less diverging execution paths between distributed or Overseer based
      // cluster state updates.

      if (isPRS) {
        // In case of a PRS collection, create the collection structure directly instead of
        // resubmitting to the overseer queue.
        // TODO: Consider doing this for all collections, not just the PRS collections.
        // TODO comment above achieved by switching the cluster to distributed state updates

        // This code directly updates Zookeeper by creating the collection state.json. It is
        // compatible with both distributed cluster state updates and Overseer based cluster state
        // updates.

        // TODO: Consider doing this for all collections, not just the PRS collections.
        ZkWriteCommand command =
            new ClusterStateMutator(ccc.getSolrCloudManager())
                .createCollection(clusterState, message);
        byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
        ccc.getZkStateReader()
            .getZkClient()
            .create(collectionPath, data, CreateMode.PERSISTENT, true);
        clusterState = clusterState.copyWith(collectionName, command.collection);
        newColl = command.collection;
        ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
      } else {
        if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
          // The message has been crafted by CollectionsHandler.CollectionOperation.CREATE_OP and
          // defines the QUEUE_OPERATION to be CollectionParams.CollectionAction.CREATE.
          ccc.getDistributedClusterStateUpdater()
              .doSingleStateUpdate(
                  DistributedClusterStateUpdater.MutatingCommand.ClusterCreateCollection,
                  message,
                  ccc.getSolrCloudManager(),
                  ccc.getZkStateReader());
        } else {
          ccc.offerStateUpdate(Utils.toJSON(message));
        }

        // wait for a while until we see the collection
        try {
          newColl =
              zkStateReader.waitForState(collectionName, 30, TimeUnit.SECONDS, Objects::nonNull);
        } catch (TimeoutException e) {
          throw new SolrException(
              SolrException.ErrorCode.SERVER_ERROR,
              "Could not fully create collection: " + collectionName,
              e);
        }

        // refresh cluster state (value read below comes from Zookeeper watch firing following the
        // update done previously, be it by Overseer or by this thread when updates are distributed)
        clusterState = ccc.getSolrCloudManager().getClusterState();
      }

      final List<ReplicaPosition> replicaPositions;
      try {
        replicaPositions =
            buildReplicaPositions(
                ccc.getCoreContainer(),
                ccc.getSolrCloudManager(),
                clusterState,
                message,
                shardNames,
                numReplicas);
      } catch (Assign.AssignmentException e) {
        ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
        new DeleteCollectionCmd(ccc).call(clusterState, deleteMessage, results);
        // unwrap the exception
        throw new SolrException(ErrorCode.BAD_REQUEST, e.getMessage(), e.getCause());
      }

      if (replicaPositions.isEmpty()) {
        log.debug("Finished create command for collection: {}", collectionName);
        return;
      }

      final ShardRequestTracker shardRequestTracker =
          CollectionHandlingUtils.asyncRequestTracker(async, ccc);
      if (log.isDebugEnabled()) {
        log.debug(
            formatString(
                "Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
                collectionName, shardNames, message));
      }
      Map<String, ShardRequest> coresToCreate = new LinkedHashMap<>();
      ShardHandler shardHandler = ccc.newShardHandler();
      final DistributedClusterStateUpdater.StateChangeRecorder scr;

      // PRS collections update Zookeeper directly, so even if we run in distributed state update,
      // there's nothing to update in state.json for such collection in the loop over replica
      // positions below.
      if (!isPRS && ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
        // The collection got created. Now we're adding replicas (and will update ZK only once when
        // done adding).
        scr =
            ccc.getDistributedClusterStateUpdater()
                .createStateChangeRecorder(collectionName, false);
        ;
      } else {
        scr = null;
      }

      for (ReplicaPosition replicaPosition : replicaPositions) {
        String nodeName = replicaPosition.node;

        String coreName =
            Assign.buildSolrCoreName(
                ccc.getSolrCloudManager().getDistribStateManager(),
                collectionName,
                replicaPosition.shard,
                replicaPosition.type);
        if (log.isDebugEnabled()) {
          log.debug(
              formatString(
                  "Creating core {0} as part of shard {1} of collection {2} on {3}",
                  coreName, replicaPosition.shard, collectionName, nodeName));
        }

        String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
        // create the replica in the collection's state.json in ZK prior to creating the core.
        // Otherwise the core creation fails
        ZkNodeProps props =
            new ZkNodeProps(
                Overseer.QUEUE_OPERATION,
                ADDREPLICA.toString(),
                ZkStateReader.COLLECTION_PROP,
                collectionName,
                ZkStateReader.SHARD_ID_PROP,
                replicaPosition.shard,
                ZkStateReader.CORE_NAME_PROP,
                coreName,
                ZkStateReader.STATE_PROP,
                Replica.State.DOWN.toString(),
                ZkStateReader.NODE_NAME_PROP,
                nodeName,
                ZkStateReader.BASE_URL_PROP,
                baseUrl,
                ZkStateReader.REPLICA_TYPE,
                replicaPosition.type.name(),
                CommonAdminParams.WAIT_FOR_FINAL_STATE,
                Boolean.toString(waitForFinalState));
        if (isPRS) {
          // In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
          // to the overseer queue.
          // TODO: Consider doing this for all collections, not just the PRS collections.

          // TODO: consider doing this once after the loop for all replicas rather than writing
          // state.json repeatedly
          // This PRS specific code is compatible with both Overseer and distributed cluster state
          // update strategies
          ZkWriteCommand command =
              new SliceMutator(ccc.getSolrCloudManager()).addReplica(clusterState, props);
          clusterState = clusterState.copyWith(collectionName, command.collection);
          newColl = command.collection;
        } else {
          if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
            scr.record(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props);
          } else {
            ccc.offerStateUpdate(Utils.toJSON(props));
          }
        }

        // Need to create new params for each request
        ModifiableSolrParams params = new ModifiableSolrParams();
        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());

        params.set(CoreAdminParams.NAME, coreName);
        params.set(COLL_CONF, configName);
        params.set(CoreAdminParams.COLLECTION, collectionName);
        params.set(CoreAdminParams.SHARD, replicaPosition.shard);
        params.set(ZkStateReader.NUM_SHARDS_PROP, shardNames.size());
        params.set(CoreAdminParams.NEW_COLLECTION, "true");
        params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name());

        if (async != null) {
          String coreAdminAsyncId = async + Math.abs(System.nanoTime());
          params.add(ASYNC, coreAdminAsyncId);
          shardRequestTracker.track(nodeName, coreAdminAsyncId);
        }
        CollectionHandlingUtils.addPropertyParams(message, params);

        ShardRequest sreq = new ShardRequest();
        sreq.nodeName = nodeName;
        params.set("qt", ccc.getAdminPath());
        sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
        sreq.shards = new String[] {baseUrl};
        sreq.actualShards = sreq.shards;
        sreq.params = params;

        coresToCreate.put(coreName, sreq);
      }

      // Update the state.json for PRS collection in a single operation
      if (isPRS) {
        byte[] data =
            Utils.toJSON(
                Collections.singletonMap(
                    collectionName, clusterState.getCollection(collectionName)));
        zkStateReader.getZkClient().setData(collectionPath, data, true);
      }

      // Distributed updates don't need to do anything for PRS collections that wrote state.json
      // directly. For non PRS collections, distributed updates have to be executed if that's how
      // the cluster is configured
      if (!isPRS && ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
        // Add the replicas to the collection state (all at once after the loop above)
        scr.executeStateUpdates(ccc.getSolrCloudManager(), ccc.getZkStateReader());
      }

      final Map<String, Replica> replicas;
      if (isPRS) {
        replicas = new ConcurrentHashMap<>();
        // Only the elements that were asked for...
        newColl.getSlices().stream()
            .flatMap(slice -> slice.getReplicas().stream())
            .filter(r -> coresToCreate.containsKey(r.getCoreName()))
            .forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
        ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
      } else {
        // wait for all replica entries to be created and visible in local cluster state (updated by
        // ZK watches)
        replicas =
            CollectionHandlingUtils.waitToSeeReplicasInState(
                ccc.getZkStateReader(),
                ccc.getSolrCloudManager().getTimeSource(),
                collectionName,
                coresToCreate.keySet());
      }

      for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
        ShardRequest sreq = e.getValue();
        sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
        shardHandler.submit(sreq, sreq.shards[0], sreq.params);
      }

      shardRequestTracker.processResponses(
          results, shardHandler, false, null, Collections.emptySet());
      boolean failure =
          results.get("failure") != null
              && ((SimpleOrderedMap<?>) results.get("failure")).size() > 0;
      if (isPRS) {
        TimeOut timeout =
            new TimeOut(
                Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120),
                TimeUnit.SECONDS,
                ccc.getSolrCloudManager().getTimeSource()); // could be a big cluster
        PerReplicaStates prs =
            PerReplicaStatesOps.fetch(collectionPath, ccc.getZkStateReader().getZkClient(), null);
        while (!timeout.hasTimedOut()) {
          if (prs.allActive()) break;
          Thread.sleep(100);
          prs =
              PerReplicaStatesOps.fetch(collectionPath, ccc.getZkStateReader().getZkClient(), null);
        }
        if (prs.allActive()) {
          // we have successfully found all replicas to be ACTIVE
        } else {
          failure = true;
        }
      }
      if (failure) {
        // Let's cleanup as we hit an exception
        // We shouldn't be passing 'results' here for the cleanup as the response would then contain
        // 'success' element, which may be interpreted by the user as a positive ack
        CollectionHandlingUtils.cleanupCollection(collectionName, new NamedList<>(), ccc);
        log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
        throw new SolrException(
            ErrorCode.BAD_REQUEST,
            "Underlying core creation failed while creating collection: " + collectionName);
      } else {
        ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
        log.debug("Finished create command on all shards for collection: {}", collectionName);
        // Emit a warning about production use of data driven functionality
        // Note: isAutoGeneratedConfigSet is always a clone of the _default configset
        boolean defaultConfigSetUsed =
            message.getStr(COLL_CONF) == null
                || message.getStr(COLL_CONF).equals(DEFAULT_CONFIGSET_NAME)
                || ConfigSetsHandler.isAutoGeneratedConfigSet(message.getStr(COLL_CONF));
        if (defaultConfigSetUsed) {
          results.add(
              "warning",
              "Using _default configset. Data driven schema functionality"
                  + " is enabled by default, which is NOT RECOMMENDED for production use. To turn it off:"
                  + " curl http://{host:port}/solr/"
                  + collectionName
                  + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'");
        }
      }

      // create an alias pointing to the new collection, if different from the collectionName
      if (!alias.equals(collectionName)) {
        ccc.getZkStateReader()
            .aliasesManager
            .applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(alias, collectionName));
      }

    } catch (SolrException ex) {
      throw ex;
    } catch (Exception ex) {
      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
    }
  }