public void call()

in solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java [170:591]


  public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Object> results)
      throws Exception {

    log.debug("*** called: {}", message);

    String extCollection = message.getStr(CommonParams.NAME);

    if (extCollection == null) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST, "Source collection name must be specified");
    }
    boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
    String collection;
    if (followAliases) {
      collection =
          ccc.getSolrCloudManager().getClusterStateProvider().resolveSimpleAlias(extCollection);
    } else {
      collection = extCollection;
    }
    if (!clusterState.hasCollection(collection)) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST, "Source collection name must exist");
    }
    String target = message.getStr(TARGET);
    if (target == null) {
      target = collection;
    } else {
      if (followAliases) {
        target = ccc.getSolrCloudManager().getClusterStateProvider().resolveSimpleAlias(target);
      }
    }
    boolean sameTarget = target.equals(collection) || target.equals(extCollection);
    boolean removeSource = message.getBool(REMOVE_SOURCE, false);
    Cmd command = Cmd.get(message.getStr(COMMAND, Cmd.START.toLower()));
    if (command == null) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + message.getStr(COMMAND));
    }
    Map<String, Object> reindexingState =
        getReindexingState(ccc.getSolrCloudManager().getDistribStateManager(), collection);
    if (!reindexingState.containsKey(STATE)) {
      reindexingState.put(STATE, State.IDLE.toLower());
    }
    State state = State.get(reindexingState.get(STATE));
    if (command == Cmd.ABORT) {
      log.info("Abort requested for collection {}, setting the state to ABORTED.", collection);
      // check that it's running
      if (state != State.RUNNING) {
        log.debug(
            "Abort requested for collection {} but command is not running: {}", collection, state);
        return;
      }
      setReindexingState(collection, State.ABORTED, null);
      reindexingState.put(STATE, "aborting");
      results.add(REINDEX_STATUS, reindexingState);
      // if needed the cleanup will be performed by the running instance of the command
      return;
    } else if (command == Cmd.STATUS) {
      results.add(REINDEX_STATUS, reindexingState);
      return;
    }
    // command == Cmd.START

    // check it's not already running
    if (state == State.RUNNING) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST,
          "Reindex is already running for collection "
              + collection
              + ". If you are sure this is not the case you can issue &cmd=abort to clean up this state.");
    }

    DocCollection coll = clusterState.getCollection(collection);
    boolean aborted = false;
    int batchSize = message.getInt(CommonParams.ROWS, 100);
    String query = message.getStr(CommonParams.Q, "*:*");
    String fl = message.getStr(CommonParams.FL, "*");
    Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
    ReplicaCount numReplicas = ReplicaCount.fromMessage(message, coll);
    int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, coll.getActiveSlices().size());
    DocRouter router = coll.getRouter();
    if (router == null) {
      router = DocRouter.DEFAULT;
    }

    String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, coll.getConfigName());
    String targetCollection;
    int seq = tmpCollectionSeq.getAndIncrement();
    if (sameTarget) {
      do {
        targetCollection = TARGET_COL_PREFIX + extCollection + "_" + seq;
        if (!clusterState.hasCollection(targetCollection)) {
          break;
        }
        seq = tmpCollectionSeq.getAndIncrement();
      } while (clusterState.hasCollection(targetCollection));
    } else {
      targetCollection = target;
    }
    String chkCollection = CHK_COL_PREFIX + extCollection;
    String daemonUrl = null;
    Replica daemonReplica = null;
    Exception exc = null;
    boolean createdTarget = false;
    try {
      // set the running flag
      reindexingState.clear();
      reindexingState.put("actualSourceCollection", collection);
      reindexingState.put("actualTargetCollection", targetCollection);
      reindexingState.put("checkpointCollection", chkCollection);
      reindexingState.put("inputDocs", getNumberOfDocs(collection));
      reindexingState.put(PHASE, "creating target and checkpoint collections");
      setReindexingState(collection, State.RUNNING, reindexingState);

      // 0. set up target and checkpoint collections
      NamedList<Object> cmdResults = new NamedList<>();
      ZkNodeProps cmd;
      if (clusterState.hasCollection(targetCollection)) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR,
            "Target collection " + targetCollection + " already exists! Delete it first.");
      }
      if (clusterState.hasCollection(chkCollection)) {
        // delete the checkpoint collection
        cmd =
            new ZkNodeProps(
                Overseer.QUEUE_OPERATION,
                CollectionParams.CollectionAction.DELETE.toLower(),
                CommonParams.NAME,
                chkCollection);
        new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
        CollectionHandlingUtils.checkResults(
            "deleting old checkpoint collection " + chkCollection, cmdResults, true);
      }

      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }

      Map<String, Object> propMap = new HashMap<>();
      propMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
      propMap.put(CommonParams.NAME, targetCollection);
      propMap.put(ZkStateReader.NUM_SHARDS_PROP, numShards);
      propMap.put(CollectionAdminParams.COLL_CONF, configName);
      // init first from the same router
      propMap.put("router.name", router.getName());
      for (String key : coll.keySet()) {
        if (key.startsWith("router.")) {
          propMap.put(key, coll.get(key));
        }
      }
      // then apply overrides if present
      for (String key : message.keySet()) {
        if (key.startsWith("router.")) {
          propMap.put(key, message.getStr(key));
        } else if (COLLECTION_PARAMS.contains(key)) {
          propMap.put(key, message.get(key));
        }
      }

      propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
      if (rf != null) {
        propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
      }
      numReplicas.writeProps(propMap);
      // create the target collection
      cmd = new ZkNodeProps(propMap);
      cmdResults = new NamedList<>();
      new CreateCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
      createdTarget = true;
      CollectionHandlingUtils.checkResults(
          "creating target collection " + targetCollection, cmdResults, true);

      // create the checkpoint collection - use RF=1 and 1 shard
      cmd =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
              CommonParams.NAME, chkCollection,
              ZkStateReader.NUM_SHARDS_PROP, "1",
              ZkStateReader.REPLICATION_FACTOR, "1",
              CollectionAdminParams.COLL_CONF, "_default",
              CommonAdminParams.WAIT_FOR_FINAL_STATE, "true");
      cmdResults = new NamedList<>();
      new CreateCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
      CollectionHandlingUtils.checkResults(
          "creating checkpoint collection " + chkCollection, cmdResults, true);
      // wait for a while until we see both collections
      try {
        for (String col : List.of(targetCollection, chkCollection)) {
          ccc.getZkStateReader().waitForState(col, 30, TimeUnit.SECONDS, Objects::nonNull);
        }
      } catch (TimeoutException e) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR, "Could not fully create temporary collection(s)");
      }

      clusterState = ccc.getSolrCloudManager().getClusterState();

      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }

      // 1. put the source collection in read-only mode
      cmd =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION,
              CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
              ZkStateReader.COLLECTION_PROP,
              collection,
              ZkStateReader.READ_ONLY,
              "true");
      if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
        ccc.getDistributedClusterStateUpdater()
            .doSingleStateUpdate(
                DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
                cmd,
                ccc.getSolrCloudManager(),
                ccc.getZkStateReader());
      } else {
        ccc.offerStateUpdate(cmd);
      }

      TestInjection.injectReindexLatch();

      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }

      // 2. copy the documents to target
      // Recipe taken from:
      // http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
      ModifiableSolrParams q = new ModifiableSolrParams();
      q.set(CommonParams.QT, "/stream");
      q.set("collection", collection);
      q.set(
          "expr",
          "daemon(id=\""
              + targetCollection
              + "\","
              + "terminate=\"true\","
              + "commit("
              + targetCollection
              + ","
              + "update("
              + targetCollection
              + ","
              + "batchSize="
              + batchSize
              + ","
              + "topic("
              + chkCollection
              + ","
              + collection
              + ","
              + "q=\""
              + query
              + "\","
              + "fl=\""
              + fl
              + "\","
              + "id=\"topic_"
              + targetCollection
              + "\","
              +
              // some of the documents eg. in .system contain large blobs
              "rows=\""
              + batchSize
              + "\","
              + "initialCheckpoint=\"0\"))))");
      log.debug("- starting copying documents from {} to {}", collection, targetCollection);
      SolrResponse rsp;
      try {
        rsp = new QueryRequest(q).process(ccc.getSolrCloudManager().getSolrClient());
      } catch (Exception e) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR,
            "Unable to copy documents from " + collection + " to " + targetCollection,
            e);
      }
      daemonReplica = getReplicaForDaemon(rsp, coll);
      if (daemonReplica == null) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR,
            "Unable to copy documents from "
                + collection
                + " to "
                + targetCollection
                + ": "
                + Utils.toJSONString(rsp));
      }
      reindexingState.put("daemonUrl", daemonReplica.getCoreUrl());
      reindexingState.put("daemonName", targetCollection);
      reindexingState.put(PHASE, "copying documents");
      setReindexingState(collection, State.RUNNING, reindexingState);

      // wait for the daemon to finish
      waitForDaemon(targetCollection, daemonReplica, collection, targetCollection, reindexingState);
      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }
      log.debug("- finished copying from {} to {}", collection, targetCollection);
      // fail here or earlier during daemon run
      TestInjection.injectReindexFailure();

      // 5. if (sameTarget) set up an alias to use targetCollection as the source name
      if (sameTarget) {
        log.debug("- setting up alias from {} to {}", extCollection, targetCollection);
        cmd = new ZkNodeProps(CommonParams.NAME, extCollection, "collections", targetCollection);
        cmdResults = new NamedList<>();
        new CreateAliasCmd(ccc).call(clusterState, cmd, cmdResults);
        CollectionHandlingUtils.checkResults(
            "setting up alias " + extCollection + " -> " + targetCollection, cmdResults, true);
        reindexingState.put("alias", extCollection + " -> " + targetCollection);
      }

      reindexingState.remove("daemonUrl");
      reindexingState.remove("daemonName");
      reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
      reindexingState.put(PHASE, "copying done, finalizing");
      setReindexingState(collection, State.RUNNING, reindexingState);

      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }
      // 6. delete the checkpoint collection
      log.debug("- deleting {}", chkCollection);
      cmd =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION,
              CollectionParams.CollectionAction.DELETE.toLower(),
              CommonParams.NAME,
              chkCollection);
      cmdResults = new NamedList<>();
      new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
      CollectionHandlingUtils.checkResults(
          "deleting checkpoint collection " + chkCollection, cmdResults, true);

      // 7. optionally delete the source collection
      if (removeSource) {
        log.debug("- deleting source collection");
        cmd =
            new ZkNodeProps(
                Overseer.QUEUE_OPERATION,
                CollectionParams.CollectionAction.DELETE.toLower(),
                CommonParams.NAME,
                collection,
                FOLLOW_ALIASES,
                "false");
        cmdResults = new NamedList<>();
        new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
        CollectionHandlingUtils.checkResults(
            "deleting source collection " + collection, cmdResults, true);
      } else {
        // 8. clear readOnly on source
        ZkNodeProps props =
            new ZkNodeProps(
                Overseer.QUEUE_OPERATION,
                CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
                ZkStateReader.COLLECTION_PROP,
                collection,
                ZkStateReader.READ_ONLY,
                null);
        if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
          ccc.getDistributedClusterStateUpdater()
              .doSingleStateUpdate(
                  DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
                  props,
                  ccc.getSolrCloudManager(),
                  ccc.getZkStateReader());
        } else {
          ccc.offerStateUpdate(props);
        }
      }
      // 9. set FINISHED state on the target and clear the state on the source
      ZkNodeProps props =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION,
              CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
              ZkStateReader.COLLECTION_PROP,
              targetCollection,
              REINDEXING_STATE,
              State.FINISHED.toLower());
      if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
        ccc.getDistributedClusterStateUpdater()
            .doSingleStateUpdate(
                DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
                props,
                ccc.getSolrCloudManager(),
                ccc.getZkStateReader());
      } else {
        ccc.offerStateUpdate(props);
      }

      reindexingState.put(STATE, State.FINISHED.toLower());
      reindexingState.put(PHASE, "done");
      removeReindexingState(collection);
    } catch (Exception e) {
      log.warn("Error during reindexing of {}", extCollection, e);
      exc = e;
      aborted = true;
    } finally {
      if (aborted) {
        cleanup(
            collection,
            targetCollection,
            chkCollection,
            daemonReplica,
            targetCollection,
            createdTarget);
        if (exc != null) {
          results.add("error", exc.toString());
        }
        reindexingState.put(STATE, State.ABORTED.toLower());
      }
      results.add(REINDEX_STATUS, reindexingState);
    }
  }