helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java [212:284]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        int startIndex = computeRandomStartIndex(replica);
        for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
          Node receiver = _liveNodesList.get(index % _liveNodesList.size());
          if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
            receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
            receiver.nonPreferred.add(replica);
            receiver.newReplicas.add(replica);
            added = true;
            break;
          }
        }
      }
      if (!added) {
        // try adding the replica by making room for it
        added = assignOrphanByMakingRoom(replica);
      }
      if (added) {
        it.remove();
      }
    }
    if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
      logger.warn("could not assign nodes to partitions: " + _orphaned);
    }
  }

  /**
   * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
   * @param replica The replica to assign
   * @return true if the assignment succeeded, false otherwise
   */
  private boolean assignOrphanByMakingRoom(Replica replica) {
    Node capacityDonor = null;
    Node capacityAcceptor = null;
    int startIndex = computeRandomStartIndex(replica);
    for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
      Node current = _liveNodesList.get(index % _liveNodesList.size());
      if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
          && !current.canAddIfCapacity(replica) && capacityDonor == null) {
        // this node has space but cannot accept the node
        capacityDonor = current;
      } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
          && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
        // this node would be able to accept the replica if it has ceiling capacity
        capacityAcceptor = current;
      }
      if (capacityDonor != null && capacityAcceptor != null) {
        break;
      }
    }
    if (capacityDonor != null && capacityAcceptor != null) {
      // transfer ceiling capacity and add the node
      capacityAcceptor.steal(capacityDonor, replica);
      return true;
    }
    return false;
  }

  /**
   * Move replicas from too-full nodes to nodes that can accept the replicas
   */
  private void moveExcessReplicas() {
    // iterate over nodes and move extra load
    Iterator<Replica> it;
    for (Node donor : _liveNodesList) {
      if (donor.capacity < donor.currentlyAssigned) {
        Collections.sort(donor.nonPreferred);
        it = donor.nonPreferred.iterator();
        while (it.hasNext()) {
          Replica replica = it.next();
          int startIndex = computeRandomStartIndex(replica);
          for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
            Node receiver = _liveNodesList.get(index % _liveNodesList.size());
            if (receiver.canAdd(replica)) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java [182:253]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
      int startIndex = computeRandomStartIndex(replica);
      for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
        Node receiver = _liveNodesList.get(index % _liveNodesList.size());
        if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
          receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
          receiver.nonPreferred.add(replica);
          receiver.newReplicas.add(replica);
          added = true;
          break;
        }
      }
      if (!added) {
        // try adding the replica by making room for it
        added = assignOrphanByMakingRoom(replica);
      }
      if (added) {
        it.remove();
      }
    }
    if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
      logger.warn("could not assign nodes to partitions: " + _orphaned);
    }
  }

  /**
   * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
   * @param replica The replica to assign
   * @return true if the assignment succeeded, false otherwise
   */
  private boolean assignOrphanByMakingRoom(Replica replica) {
    Node capacityDonor = null;
    Node capacityAcceptor = null;
    int startIndex = computeRandomStartIndex(replica);
    for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
      Node current = _liveNodesList.get(index % _liveNodesList.size());
      if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
          && !current.canAddIfCapacity(replica) && capacityDonor == null) {
        // this node has space but cannot accept the node
        capacityDonor = current;
      } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
          && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
        // this node would be able to accept the replica if it has ceiling capacity
        capacityAcceptor = current;
      }
      if (capacityDonor != null && capacityAcceptor != null) {
        break;
      }
    }
    if (capacityDonor != null && capacityAcceptor != null) {
      // transfer ceiling capacity and add the node
      capacityAcceptor.steal(capacityDonor, replica);
      return true;
    }
    return false;
  }

  /**
   * Move replicas from too-full nodes to nodes that can accept the replicas
   */
  private void moveExcessReplicas() {
    // iterate over nodes and move extra load
    Iterator<Replica> it;
    for (Node donor : _liveNodesList) {
      if (donor.capacity < donor.currentlyAssigned) {
        Collections.sort(donor.nonPreferred);
        it = donor.nonPreferred.iterator();
        while (it.hasNext()) {
          Replica replica = it.next();
          int startIndex = computeRandomStartIndex(replica);
          for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
            Node receiver = _liveNodesList.get(index % _liveNodesList.size());
            if (receiver.canAdd(replica)) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



