in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/rest/resources/TopicManagementRestletResource.java [75:144]
public Representation get() {
final String topicName = (String) getRequest().getAttributes().get("topicName");
// Get whole picture of the deployment
if (topicName == null) {
// TODO: updateCurrentStatus might take a long time
_helixMirrorMakerManager.updateCurrentStatus();
Map<String, Map<String, InstanceTopicPartitionHolder>> topicToPipelineInstanceMap = _helixMirrorMakerManager
.getTopicToPipelineInstanceMap();
if (topicToPipelineInstanceMap == null || topicToPipelineInstanceMap.isEmpty()) {
return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
"No topic is added in uReplicator!");
} else {
JSONObject responseJson = new JSONObject();
responseJson.put("status", Status.SUCCESS_OK.getCode());
JSONObject topicToInstanceMappingJson = new JSONObject();
topicToInstanceMappingJson
.put("topics", _helixMirrorMakerManager.getTopicToPipelineInstanceMap().keySet());
for (String topic : topicToPipelineInstanceMap.keySet()) {
JSONObject topicInfoJson = new JSONObject();
for (String pipeline : topicToPipelineInstanceMap.get(topic).keySet()) {
JSONObject instanceInfoJson = new JSONObject();
instanceInfoJson.put("controller",
topicToPipelineInstanceMap.get(topic).get(pipeline).getInstanceName());
instanceInfoJson
.put("route", topicToPipelineInstanceMap.get(topic).get(pipeline).getRouteString());
instanceInfoJson
.put("workers", topicToPipelineInstanceMap.get(topic).get(pipeline).getWorkerSet());
topicInfoJson.put(pipeline, instanceInfoJson);
}
topicToInstanceMappingJson.put(topic, topicInfoJson);
}
responseJson.put("message", topicToInstanceMappingJson);
return new StringRepresentation(responseJson.toJSONString());
}
}
// Get pipeline information
if (ControllerUtils.isPipelineName(topicName)) {
try {
if (_helixMirrorMakerManager.isPipelineExisted(topicName)) {
return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
getHelixInfoJsonFromManager(topicName));
} else {
return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
String.format("Failed to get view for route: %s, it is not existed!", topicName));
}
} catch (Exception e) {
LOGGER.error("Failed to get view for topic: {} due to exception: {}", topicName, e, e);
return getResponseJsonStringRepresentation(Status.SERVER_ERROR_INTERNAL,
String.format("Failed to get view for route: %s due to exception: %s", topicName, e));
}
} else if (_helixMirrorMakerManager.isTopicExisted(topicName)) {
LOGGER.info("_topicToPipelineInstanceMap: {} to {}", topicName,
_helixMirrorMakerManager.getTopic(topicName));
// TODO: add worker information
JSONObject messageJson = new JSONObject();
messageJson.put("managerView", getHelixInfoJsonFromManager(topicName));
messageJson
.put("controllerView", _helixMirrorMakerManager.getTopicInfoFromController(topicName));
return getResponseJsonStringRepresentation(Status.SUCCESS_OK, messageJson);
} else {
return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
String.format("Failed to find topic: %s", topicName));
}
}