public synchronized void expandTopicInMirrorMaker()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [1249:1318]


  public synchronized void expandTopicInMirrorMaker(
      String topicName,
      String srcCluster,
      String pipeline,
      int newNumPartitions) throws Exception {
    _lock.lock();
    try {
      LOGGER
          .info("Trying to expand topic: {} in pipeline: {} to {} partitions", topicName, pipeline,
              newNumPartitions);

      if (!isTopicPipelineExisted(topicName, pipeline)) {
        updateCurrentStatus();
      }
      if (!isTopicPipelineExisted(topicName, pipeline)) {
        LOGGER.info("Topic {} doesn't exist in pipeline {}, abandon expanding topic", topicName,
            pipeline);
        throw new Exception(
            String.format("Topic %s doesn't exist in pipeline %s, abandon expanding topic!",
                topicName, pipeline));
      }

      InstanceTopicPartitionHolder itph = _topicToPipelineInstanceMap.get(topicName).get(pipeline);

      boolean found = false;
      int oldNumPartitions = 0;
      for (TopicPartition tp : itph.getServingTopicPartitionSet()) {
        if (tp.getTopic().equals(topicName)) {
          found = true;
          oldNumPartitions = tp.getPartition();
          if (newNumPartitions <= oldNumPartitions) {
            LOGGER.info(
                "New partition {} is not bigger than current partition {} of topic {}, abandon expanding topic",
                newNumPartitions, oldNumPartitions, topicName);
            throw new Exception(String
                .format("New partition %s is not bigger than current partition %s of topic %s, "
                    + "abandon expanding topic!", newNumPartitions, oldNumPartitions, topicName));
          }
        }
      }

      if (!found) {
        LOGGER.info("Failed to find topic {} in pipeline {}, abandon expanding topic", topicName,
            pipeline);
        throw new Exception(
            String.format("Failed to find topic %s in pipeline %s, abandon expanding topic!",
                topicName, pipeline));
      }

      JSONObject entity = new JSONObject();
      entity.put("topic", topicName);
      entity.put("numPartitions", newNumPartitions);
      HostAndPort hostInfo = getHostInfo(itph.getInstanceName());
      int respCode = HttpClientUtils.putData(_httpClient, _requestConfig,
          hostInfo.getHost(), hostInfo.getPort(), "/topics", entity);
      if (respCode != 200) {
        LOGGER.info("Got error from controller {} when expanding topic {} with respCode {}",
            itph.getInstanceName(), topicName, respCode);
        throw new Exception(
            String.format("Got error from controller %s when expanding topic %s with respCode %s",
                itph.getInstanceName(), topicName, respCode));
      }

      itph.removeTopicPartition(new TopicPartition(topicName, oldNumPartitions, pipeline));
      itph.addTopicPartition(new TopicPartition(topicName, newNumPartitions, pipeline));
      _kafkaValidationManager.getClusterToObserverMap().get(srcCluster).tryUpdateTopic(topicName);
    } finally {
      _lock.unlock();
    }
  }