in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [1249:1318]
public synchronized void expandTopicInMirrorMaker(
String topicName,
String srcCluster,
String pipeline,
int newNumPartitions) throws Exception {
_lock.lock();
try {
LOGGER
.info("Trying to expand topic: {} in pipeline: {} to {} partitions", topicName, pipeline,
newNumPartitions);
if (!isTopicPipelineExisted(topicName, pipeline)) {
updateCurrentStatus();
}
if (!isTopicPipelineExisted(topicName, pipeline)) {
LOGGER.info("Topic {} doesn't exist in pipeline {}, abandon expanding topic", topicName,
pipeline);
throw new Exception(
String.format("Topic %s doesn't exist in pipeline %s, abandon expanding topic!",
topicName, pipeline));
}
InstanceTopicPartitionHolder itph = _topicToPipelineInstanceMap.get(topicName).get(pipeline);
boolean found = false;
int oldNumPartitions = 0;
for (TopicPartition tp : itph.getServingTopicPartitionSet()) {
if (tp.getTopic().equals(topicName)) {
found = true;
oldNumPartitions = tp.getPartition();
if (newNumPartitions <= oldNumPartitions) {
LOGGER.info(
"New partition {} is not bigger than current partition {} of topic {}, abandon expanding topic",
newNumPartitions, oldNumPartitions, topicName);
throw new Exception(String
.format("New partition %s is not bigger than current partition %s of topic %s, "
+ "abandon expanding topic!", newNumPartitions, oldNumPartitions, topicName));
}
}
}
if (!found) {
LOGGER.info("Failed to find topic {} in pipeline {}, abandon expanding topic", topicName,
pipeline);
throw new Exception(
String.format("Failed to find topic %s in pipeline %s, abandon expanding topic!",
topicName, pipeline));
}
JSONObject entity = new JSONObject();
entity.put("topic", topicName);
entity.put("numPartitions", newNumPartitions);
HostAndPort hostInfo = getHostInfo(itph.getInstanceName());
int respCode = HttpClientUtils.putData(_httpClient, _requestConfig,
hostInfo.getHost(), hostInfo.getPort(), "/topics", entity);
if (respCode != 200) {
LOGGER.info("Got error from controller {} when expanding topic {} with respCode {}",
itph.getInstanceName(), topicName, respCode);
throw new Exception(
String.format("Got error from controller %s when expanding topic %s with respCode %s",
itph.getInstanceName(), topicName, respCode));
}
itph.removeTopicPartition(new TopicPartition(topicName, oldNumPartitions, pipeline));
itph.addTopicPartition(new TopicPartition(topicName, newNumPartitions, pipeline));
_kafkaValidationManager.getClusterToObserverMap().get(srcCluster).tryUpdateTopic(topicName);
} finally {
_lock.unlock();
}
}