in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/rest/resources/TopicManagementRestletResource.java [265:322]
public Representation delete() {
final String topicName = (String) getRequest().getAttributes().get("topicName");
if (ControllerUtils.isPipelineName(topicName)) {
// Delete pipeline
LOGGER.info("Received request to delete pipeline {} on uReplicator ", topicName);
if (!_helixMirrorMakerManager.isPipelineExisted(topicName)) {
LOGGER.info("Failed to delete not existed pipeline {}", topicName);
return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
String.format("Failed to delete not existed pipeline: %s", topicName));
}
try {
_helixMirrorMakerManager.deletePipelineInMirrorMaker(topicName);
LOGGER.info("Successfully delete pipeline: {}", topicName);
return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
String.format("Successfully delete pipeline: %s", topicName));
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("Failed to delete topic: {} due to exception: {}", topicName, e);
return getResponseJsonStringRepresentation(Status.SERVER_ERROR_INTERNAL,
String.format("Failed to delete topic: %s due to exception: %s", topicName, e));
}
} else {
// Delete topic
Form queryParams = getRequest().getResourceRef().getQueryAsForm();
String srcCluster = queryParams.getFirstValue("src");
String dstCluster = queryParams.getFirstValue("dst");
String pipeline = ControllerUtils.getPipelineName(srcCluster, dstCluster);
LOGGER.info("Received request to delete topic {} from {} to {} on uReplicator ",
topicName, srcCluster, dstCluster);
if (_helixMirrorMakerManager.isTopicPipelineExisted(topicName, pipeline)) {
try {
_helixMirrorMakerManager
.deleteTopicInMirrorMaker(topicName, srcCluster, dstCluster, pipeline);
LOGGER.info("Successfully delete topic: {} from {} to {}", topicName, srcCluster,
dstCluster);
return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
String.format("Successfully delete topic: %s from %s to %s", topicName, srcCluster,
dstCluster));
} catch (Exception e) {
LOGGER.info("Failed to delete the topic {} from {} to {} due to exception {}",
topicName, srcCluster, dstCluster, e);
return getResponseJsonStringRepresentation(Status.SERVER_ERROR_INTERNAL,
String.format("Failed to delete topic: %s from: %s to: %s due to exception: %s",
topicName, srcCluster, dstCluster, e.toString()));
}
} else {
LOGGER.info("Failed to delete not existed topic {} in pipeline {}", topicName, pipeline);
// return 200 for deleting non-exist topic so caller won't try to delete again
return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
String.format("Failed to delete not existed topic: %s in pipeline: %s", topicName,
pipeline));
}
}
}