in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/rest/resources/TopicManagementRestletResource.java [148:219]
public Representation post(Representation entity) {
final String topicName = (String) getRequest().getAttributes().get("topicName");
Form queryParams = getRequest().getResourceRef().getQueryAsForm();
String srcCluster = queryParams.getFirstValue("src");
String dstCluster = queryParams.getFirstValue("dst");
LOGGER.info("Received request to whitelist topic {} from {} to {} on uReplicator ",
topicName, srcCluster, dstCluster);
// TODO: validate src->dst combination
if (!isValidPipeline(srcCluster, dstCluster)) {
LOGGER.warn(
"Failed to whitelist topic {} on uReplicator because of not valid pipeline from {} to {}",
topicName, srcCluster, dstCluster);
return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
String.format(
"Failed to whitelist topic %s on uReplicator because of not valid pipeline from %s to %s",
topicName, srcCluster, dstCluster));
}
// Check if topic in source cluster
TopicPartition srcTopicPartitionInfo = _clusterToObserverMap.get(srcCluster)
.getTopicPartitionWithRefresh(topicName);
LOGGER.info("Source topicPartitionInfo: {}", srcTopicPartitionInfo);
if (srcTopicPartitionInfo == null) {
LOGGER
.warn("Failed to whitelist topic {} on uReplicator because of not exists in src cluster",
topicName);
return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
String.format(
"Failed to whitelist new topic: %s, it's not existed in source Kafka cluster!",
topicName));
}
// Check if topic in destination cluster
TopicPartition dstTopicPartitionInfo = _clusterToObserverMap.get(dstCluster)
.getTopicPartitionWithRefresh(topicName);
LOGGER.info("Destination topicPartitionInfo: {}", dstTopicPartitionInfo);
if (dstTopicPartitionInfo == null) {
LOGGER
.warn("Failed to whitelist topic {} on uReplicator because of not exists in dst cluster",
topicName);
return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
String.format(
"Failed to whitelist new topic: %s, it's not existed in destination Kafka cluster!",
topicName));
}
_helixMirrorMakerManager.updateCurrentStatus();
String pipeline = ControllerUtils.getPipelineName(srcCluster, dstCluster);
if (_helixMirrorMakerManager.isTopicPipelineExisted(topicName, pipeline)) {
LOGGER.info("Topic {} already on uReplicator in pipeline {}", topicName, pipeline);
return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
String.format("Failed to add new topic: %s from: %s to: %s, it is already existed!",
topicName, srcCluster, dstCluster));
} else {
try {
_helixMirrorMakerManager.addTopicToMirrorMaker(srcTopicPartitionInfo.getTopic(),
srcTopicPartitionInfo.getPartition(), srcCluster, dstCluster, pipeline);
LOGGER.info("Successfully whitelist the topic {} from {} to {}", topicName, srcCluster,
dstCluster);
return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
String.format("Successfully add new topic: %s from: %s to: %s",
srcTopicPartitionInfo.getTopic(), srcCluster, dstCluster));
} catch (Exception e) {
LOGGER.info("Failed to whitelist the topic {} from {} to {} due to exception {}",
topicName, srcCluster, dstCluster, e);
return getResponseJsonStringRepresentation(Status.SERVER_ERROR_INTERNAL,
String.format("Failed add new topic: %s from: %s to: %s due to exception: %s",
srcTopicPartitionInfo.getTopic(), srcCluster, dstCluster, e.toString()));
}
}
}