in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [1320:1354]
public synchronized void deletePipelineInMirrorMaker(String pipeline) {
// TODO: delete topic first
_lock.lock();
try {
LOGGER.info("Trying to delete pipeline: {}", pipeline);
PriorityQueue<InstanceTopicPartitionHolder> itphSet = _pipelineToInstanceMap.get(pipeline);
for (InstanceTopicPartitionHolder itph : itphSet) {
if (itph.getTotalNumPartitions() != 0) {
throw new UnsupportedOperationException(
"Delete non-empty pipeline is not allowed, serving number of partitions: "
+ String.valueOf(itph.getTotalNumPartitions()));
}
}
_workerHelixManager.deletePipelineInMirrorMaker(pipeline);
_helixAdmin.dropResource(_helixClusterName, pipeline);
_pipelineToInstanceMap.remove(pipeline);
// Maybe clear instanceHolder's worker set
List<String> topicsToDelete = new ArrayList<>();
for (String topic : _topicToPipelineInstanceMap.keySet()) {
if (_topicToPipelineInstanceMap.get(topic).containsKey(pipeline)) {
_topicToPipelineInstanceMap.get(topic).remove(pipeline);
}
if (_topicToPipelineInstanceMap.get(topic).isEmpty()) {
topicsToDelete.add(topic);
}
}
for (String topic : topicsToDelete) {
_topicToPipelineInstanceMap.remove(topic);
}
} finally {
_lock.unlock();
}
}