in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/TopicManagementRestletResource.java [115:178]
public Representation post(Representation entity) {
try {
final String topicName = (String) getRequest().getAttributes().get("topicName");
LOGGER.info("received request to whitelist topic {} on mm ", topicName);
if (_managerControllerHelix != null) {
// federated mode
Form params = getRequest().getResourceRef().getQueryAsForm();
String srcCluster = params.getFirstValue("src");
String dstCluster = params.getFirstValue("dst");
String routeId = params.getFirstValue("routeid");
if (srcCluster == null || dstCluster == null || routeId == null) {
getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
return new StringRepresentation("Missing parameters for whitelisting topic " + topicName
+ " in federated uReplicator");
}
if (!_managerControllerHelix.handleTopicAssignmentEvent(topicName, srcCluster, dstCluster, routeId, "ONLINE")) {
getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
String resp = String.format("Failed to add new topic: %s, src=%s, dst=%s, routeid=%s",
topicName, srcCluster, dstCluster, routeId);
LOGGER.info(resp);
return new StringRepresentation(resp);
}
String resp = String.format("Successfully add new topic: %s, src=%s, dst=%s, routeid=%s",
topicName, srcCluster, dstCluster, routeId);
LOGGER.info(resp);
return new StringRepresentation(resp);
}
String jsonRequest = entity.getText();
TopicPartition topicPartitionInfo = null;
if ((jsonRequest == null || jsonRequest.isEmpty()) && topicName != null
&& _srcKafkaBrokerTopicObserver != null) {
// Only triggered when srcKafkaObserver is there and curl call has no json blob.
topicPartitionInfo = _srcKafkaBrokerTopicObserver.getTopicPartitionWithRefresh(topicName);
if (topicPartitionInfo == null) {
LOGGER.warn("failed to whitelist topic {} on mm because of not exists in src cluster", topicName);
getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
return new StringRepresentation(String.format(
"Failed to add new topic: %s, it's not exsited in source kafka cluster!", topicName));
}
} else {
topicPartitionInfo = TopicPartition.init(jsonRequest);
}
if (_autoTopicWhitelistingManager != null) {
_autoTopicWhitelistingManager.removeFromBlacklist(topicPartitionInfo.getTopic());
}
if (_helixMirrorMakerManager.isTopicExisted(topicPartitionInfo.getTopic())) {
LOGGER.info("topic {} already on mm", topicName);
getResponse().setStatus(Status.CLIENT_ERROR_NOT_FOUND);
return new StringRepresentation(String.format(
"Failed to add new topic: %s, it is already existed!", topicPartitionInfo.getTopic()));
} else {
_helixMirrorMakerManager.addTopicToMirrorMaker(topicPartitionInfo);
LOGGER.info("successuflly whitelist the topic {}", topicName);
return new StringRepresentation(
String.format("Successfully add new topic: %s", topicPartitionInfo));
}
} catch (IOException e) {
LOGGER.error("Got error during processing Post request", e);
getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
return new StringRepresentation(
String.format("Failed to add new topic, with exception: %s", e));
}
}