public synchronized void updateTopicPartitionStateInMirrorMaker()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java [251:291]


  public synchronized void updateTopicPartitionStateInMirrorMaker(String topicName, int partition,
      String state) {
    updateCurrentServingInstance();
    if (!Constants.HELIX_OFFLINE_STATE.equalsIgnoreCase(state) && !Constants.HELIX_ONLINE_STATE
        .equalsIgnoreCase(state)) {
      throw new IllegalArgumentException(
          String.format("Failed to update topic %s, partition %d to invalid state %s.",
              topicName, partition, state));
    }

    IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, topicName);
    String partitionName = String.valueOf(partition);
    if (idealState == null ||
        partition >= idealState.getNumPartitions()) {
      throw new IllegalArgumentException(
          String.format("Topic %s, partition %d not exists in current route.",
              topicName, partition));
    }

    String instanceName;
    if (idealState.getInstanceStateMap(partitionName).keySet().isEmpty()) {
      if (Constants.HELIX_OFFLINE_STATE.equalsIgnoreCase(state)) {
        throw new IllegalArgumentException(
            String.format("Topic %s, partition %d not exists in current route.",
                topicName, partition));
      } else if (_currentServingInstance.isEmpty()) {
        throw new InternalError("No available worker");
      }
      instanceName = _currentServingInstance.poll().getInstanceName();
    } else {
      instanceName = idealState.getInstanceStateMap(partitionName).keySet().iterator().next();
      String oldState = idealState.getInstanceStateMap(partitionName).get(instanceName);
      if (oldState.equalsIgnoreCase(state)) {
        throw new IllegalArgumentException(String.format("Topic %s, partition %d already set %s",
            idealState.getResourceName(), partition, state));
      }
    }

    idealState.setPartitionState(partitionName, instanceName, state);
    _helixAdmin.setResourceIdealState(_helixClusterName, topicName, idealState);
  }