in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java [251:291]
public synchronized void updateTopicPartitionStateInMirrorMaker(String topicName, int partition,
String state) {
updateCurrentServingInstance();
if (!Constants.HELIX_OFFLINE_STATE.equalsIgnoreCase(state) && !Constants.HELIX_ONLINE_STATE
.equalsIgnoreCase(state)) {
throw new IllegalArgumentException(
String.format("Failed to update topic %s, partition %d to invalid state %s.",
topicName, partition, state));
}
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, topicName);
String partitionName = String.valueOf(partition);
if (idealState == null ||
partition >= idealState.getNumPartitions()) {
throw new IllegalArgumentException(
String.format("Topic %s, partition %d not exists in current route.",
topicName, partition));
}
String instanceName;
if (idealState.getInstanceStateMap(partitionName).keySet().isEmpty()) {
if (Constants.HELIX_OFFLINE_STATE.equalsIgnoreCase(state)) {
throw new IllegalArgumentException(
String.format("Topic %s, partition %d not exists in current route.",
topicName, partition));
} else if (_currentServingInstance.isEmpty()) {
throw new InternalError("No available worker");
}
instanceName = _currentServingInstance.poll().getInstanceName();
} else {
instanceName = idealState.getInstanceStateMap(partitionName).keySet().iterator().next();
String oldState = idealState.getInstanceStateMap(partitionName).get(instanceName);
if (oldState.equalsIgnoreCase(state)) {
throw new IllegalArgumentException(String.format("Topic %s, partition %d already set %s",
idealState.getResourceName(), partition, state));
}
}
idealState.setPartitionState(partitionName, instanceName, state);
_helixAdmin.setResourceIdealState(_helixClusterName, topicName, idealState);
}