static boolean migrateReplicas()

in solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java [65:246]


  static boolean migrateReplicas(
      CollectionCommandContext ccc,
      Map<Replica, String> movements,
      boolean parallel,
      boolean waitForFinalState,
      int timeout,
      String asyncId,
      NamedList<Object> results)
      throws IOException, InterruptedException, KeeperException {
    // how many leaders are we moving? for these replicas we have to make sure that either:
    // * another existing replica can become a leader, or
    // * we wait until the newly created replica completes recovery (and can become the new leader)
    // If waitForFinalState=true we wait for all replicas
    int numLeaders = 0;
    for (Replica replica : movements.keySet()) {
      if (replica.isLeader() || waitForFinalState) {
        numLeaders++;
      }
    }
    // map of collectionName_coreNodeName to watchers
    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
    List<ZkNodeProps> createdReplicas = new ArrayList<>();

    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
    SolrCloseableLatch countDownLatch =
        new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());

    SolrCloseableLatch replicasToRecover =
        new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());

    ClusterState clusterState = ccc.getZkStateReader().getClusterState();

    for (Map.Entry<Replica, String> movement : movements.entrySet()) {
      Replica sourceReplica = movement.getKey();
      String targetNode = movement.getValue();
      String sourceCollection = sourceReplica.getCollection();
      if (log.isInfoEnabled()) {
        log.info(
            "Going to create replica for collection={} shard={} on node={}",
            sourceCollection,
            sourceReplica.getShard(),
            targetNode);
      }

      ZkNodeProps msg =
          sourceReplica
              .toFullProps()
              .plus("parallel", String.valueOf(parallel))
              .plus(CoreAdminParams.NODE, targetNode);
      if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
      NamedList<Object> nl = new NamedList<>();
      final ZkNodeProps addedReplica =
          new AddReplicaCmd(ccc)
              .addReplica(
                  clusterState,
                  msg,
                  nl,
                  () -> {
                    countDownLatch.countDown();
                    if (nl.get("failure") != null) {
                      String errorString =
                          String.format(
                              Locale.ROOT,
                              "Failed to create replica for collection=%s shard=%s on node=%s",
                              sourceCollection,
                              sourceReplica.getShard(),
                              targetNode);
                      log.warn(errorString);
                      // one replica creation failed. Make the best attempt to
                      // delete all the replicas created so far in the target
                      // and exit
                      synchronized (results) {
                        results.add("failure", errorString);
                        anyOneFailed.set(true);
                      }
                    } else {
                      if (log.isDebugEnabled()) {
                        log.debug(
                            "Successfully created replica for collection={} shard={} on node={}",
                            sourceCollection,
                            sourceReplica.getShard(),
                            targetNode);
                      }
                    }
                  })
              .get(0);

      if (addedReplica != null) {
        createdReplicas.add(addedReplica);
        if (sourceReplica.isLeader() || waitForFinalState) {
          String shardName = sourceReplica.getShard();
          String replicaName = sourceReplica.getName();
          String key = sourceCollection + "_" + replicaName;
          CollectionStateWatcher watcher;
          if (waitForFinalState) {
            watcher =
                new ActiveReplicaWatcher(
                    sourceCollection,
                    null,
                    Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
                    replicasToRecover);
          } else {
            watcher =
                new LeaderRecoveryWatcher(
                    sourceCollection,
                    shardName,
                    replicaName,
                    addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
                    replicasToRecover);
          }
          watchers.put(key, watcher);
          log.debug("--- adding {}, {}", key, watcher);
          ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, watcher);
        } else {
          log.debug("--- not waiting for {}", addedReplica);
        }
      }
    }

    log.debug("Waiting for replicas to be added");
    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
      log.info("Timed out waiting for replicas to be added");
      anyOneFailed.set(true);
    } else {
      log.debug("Finished waiting for replicas to be added");
    }

    // now wait for leader replicas to recover
    log.debug("Waiting for {} leader replicas to recover", numLeaders);
    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
      if (log.isInfoEnabled()) {
        log.info(
            "Timed out waiting for {} leader replicas to recover", replicasToRecover.getCount());
      }
      anyOneFailed.set(true);
    } else {
      log.debug("Finished waiting for leader replicas to recover");
    }
    // remove the watchers, we're done either way
    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
      ccc.getZkStateReader().removeCollectionStateWatcher(e.getKey(), e.getValue());
    }
    if (anyOneFailed.get()) {
      log.info("Failed to create some replicas. Cleaning up all newly created replicas.");
      SolrCloseableLatch cleanupLatch =
          new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn());
      for (ZkNodeProps createdReplica : createdReplicas) {
        NamedList<Object> deleteResult = new NamedList<>();
        try {
          new DeleteReplicaCmd(ccc)
              .deleteReplica(
                  ccc.getZkStateReader().getClusterState(),
                  createdReplica.plus("parallel", "true"),
                  deleteResult,
                  () -> {
                    cleanupLatch.countDown();
                    if (deleteResult.get("failure") != null) {
                      synchronized (results) {
                        results.add(
                            "failure",
                            "Could not cleanup, because of : " + deleteResult.get("failure"));
                      }
                    }
                  });
        } catch (KeeperException e) {
          cleanupLatch.countDown();
          log.warn("Error deleting replica ", e);
        } catch (Exception e) {
          log.warn("Error deleting replica ", e);
          cleanupLatch.countDown();
          throw e;
        }
      }
      cleanupLatch.await(5, TimeUnit.MINUTES);
      return false;
    }

    // we have reached this far, meaning all replicas should have been recreated.
    // now cleanup the original replicas
    return cleanupReplicas(
        results, ccc.getZkStateReader().getClusterState(), movements.keySet(), ccc, asyncId);
  }