public Queue newQueryPlan()

in core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java [132:240]


  public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
    if (!avoidSlowReplicas) {
      return super.newQueryPlan(request, session);
    }

    // Take a snapshot since the set is concurrent:
    Object[] currentNodes = getLiveNodes().dc(getLocalDatacenter()).toArray();

    Set<Node> allReplicas = getReplicas(request, session);
    int replicaCount = 0; // in currentNodes

    if (!allReplicas.isEmpty()) {

      // Move replicas to the beginning of the plan
      for (int i = 0; i < currentNodes.length; i++) {
        Node node = (Node) currentNodes[i];
        if (allReplicas.contains(node)) {
          ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
          replicaCount++;
        }
      }

      if (replicaCount > 1) {

        shuffleHead(currentNodes, replicaCount);

        if (replicaCount > 2) {

          assert session != null;

          // Test replicas health
          Node newestUpReplica = null;
          BitSet unhealthyReplicas = null; // bit mask storing indices of unhealthy replicas
          long mostRecentUpTimeNanos = -1;
          long now = nanoTime();
          for (int i = 0; i < replicaCount; i++) {
            Node node = (Node) currentNodes[i];
            assert node != null;
            Long upTimeNanos = upTimes.get(node);
            if (upTimeNanos != null
                && now - upTimeNanos - NEWLY_UP_INTERVAL_NANOS < 0
                && upTimeNanos - mostRecentUpTimeNanos > 0) {
              newestUpReplica = node;
              mostRecentUpTimeNanos = upTimeNanos;
            }
            if (newestUpReplica == null && isUnhealthy(node, session, now)) {
              if (unhealthyReplicas == null) {
                unhealthyReplicas = new BitSet(replicaCount);
              }
              unhealthyReplicas.set(i);
            }
          }

          // When:
          // - there isn't any newly UP replica and
          // - there is one or more unhealthy replicas and
          // - there is a majority of healthy replicas
          int unhealthyReplicasCount =
              unhealthyReplicas == null ? 0 : unhealthyReplicas.cardinality();
          if (newestUpReplica == null
              && unhealthyReplicasCount > 0
              && unhealthyReplicasCount < (replicaCount / 2.0)) {

            // Reorder the unhealthy replicas to the back of the list
            // Start from the back of the replicas, then move backwards;
            // stop once all unhealthy replicas are moved to the back.
            int counter = 0;
            for (int i = replicaCount - 1; i >= 0 && counter < unhealthyReplicasCount; i--) {
              if (unhealthyReplicas.get(i)) {
                ArrayUtils.bubbleDown(currentNodes, i, replicaCount - 1 - counter);
                counter++;
              }
            }
          }

          // When:
          // - there is a newly UP replica and
          // - the replica in first or second position is the most recent replica marked as UP and
          // - dice roll 1d4 != 1
          else if ((newestUpReplica == currentNodes[0] || newestUpReplica == currentNodes[1])
              && diceRoll1d4() != 1) {

            // Send it to the back of the replicas
            ArrayUtils.bubbleDown(
                currentNodes, newestUpReplica == currentNodes[0] ? 0 : 1, replicaCount - 1);
          }

          // Reorder the first two replicas in the shuffled list based on the number of
          // in-flight requests
          if (getInFlight((Node) currentNodes[0], session)
              > getInFlight((Node) currentNodes[1], session)) {
            ArrayUtils.swap(currentNodes, 0, 1);
          }
        }
      }
    }

    LOG.trace("[{}] Prioritizing {} local replicas", logPrefix, replicaCount);

    // Round-robin the remaining nodes
    ArrayUtils.rotate(
        currentNodes,
        replicaCount,
        currentNodes.length - replicaCount,
        roundRobinAmount.getAndUpdate(INCREMENT));

    QueryPlan plan = currentNodes.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan(currentNodes);
    return maybeAddDcFailover(request, plan);
  }