public synchronized void deletePipelineInMirrorMaker()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [1320:1354]


  public synchronized void deletePipelineInMirrorMaker(String pipeline) {
    // TODO: delete topic first
    _lock.lock();
    try {
      LOGGER.info("Trying to delete pipeline: {}", pipeline);
      PriorityQueue<InstanceTopicPartitionHolder> itphSet = _pipelineToInstanceMap.get(pipeline);
      for (InstanceTopicPartitionHolder itph : itphSet) {
        if (itph.getTotalNumPartitions() != 0) {
          throw new UnsupportedOperationException(
              "Delete non-empty pipeline is not allowed, serving number of partitions: "
                  + String.valueOf(itph.getTotalNumPartitions()));
        }
      }

      _workerHelixManager.deletePipelineInMirrorMaker(pipeline);
      _helixAdmin.dropResource(_helixClusterName, pipeline);

      _pipelineToInstanceMap.remove(pipeline);
      // Maybe clear instanceHolder's worker set
      List<String> topicsToDelete = new ArrayList<>();
      for (String topic : _topicToPipelineInstanceMap.keySet()) {
        if (_topicToPipelineInstanceMap.get(topic).containsKey(pipeline)) {
          _topicToPipelineInstanceMap.get(topic).remove(pipeline);
        }
        if (_topicToPipelineInstanceMap.get(topic).isEmpty()) {
          topicsToDelete.add(topic);
        }
      }
      for (String topic : topicsToDelete) {
        _topicToPipelineInstanceMap.remove(topic);
      }
    } finally {
      _lock.unlock();
    }
  }