in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/rest/resources/TopicManagementRestletResource.java [223:261]
public Representation put(Representation entity) {
final String topicName = (String) getRequest().getAttributes().get("topicName");
Form queryParams = getRequest().getResourceRef().getQueryAsForm();
String srcCluster = queryParams.getFirstValue("src");
String dstCluster = queryParams.getFirstValue("dst");
String newNumPartitions = queryParams.getFirstValue("partitions");
LOGGER.info("Received request to expand topic {} from {} to {} to {} partitions on uReplicator",
topicName, srcCluster, dstCluster, newNumPartitions);
_helixMirrorMakerManager.updateCurrentStatus();
String pipeline = ControllerUtils.getPipelineName(srcCluster, dstCluster);
if (!_helixMirrorMakerManager.isTopicPipelineExisted(topicName, pipeline)) {
LOGGER.info("Topic {} doesn't exist in pipeline {}, abandon expanding topic", topicName,
pipeline);
return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
String
.format("Topic %s doesn't exist in pipeline %s, abandon expanding topic!", topicName,
pipeline));
} else {
try {
_helixMirrorMakerManager.expandTopicInMirrorMaker(topicName, srcCluster, pipeline,
Integer.valueOf(newNumPartitions));
LOGGER.info("Successfully expand the topic {} in pipeline {} to {} partitions",
topicName, pipeline, newNumPartitions);
return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
String.format("Successfully expand the topic %s in pipeline %s to %s partitions",
topicName, pipeline, newNumPartitions));
} catch (Exception e) {
LOGGER.error(String.format(
"Failed to expand the topic %s in pipeline %s to %s partitions due to exception",
topicName, pipeline, newNumPartitions), e);
return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
String.format(
"Failed to expand the topic %s in pipeline %s to %s partitions due to exception: %s",
topicName, pipeline, newNumPartitions, e.toString()));
}
}
}