in src/main/java/org/opensearch/performanceanalyzer/rest/QueryMetricsRequestHandler.java [88:235]
public void handle(HttpExchange exchange) throws IOException {
String requestMethod = exchange.getRequestMethod();
LOG.info(
"{} {} {}",
exchange.getRequestMethod(),
exchange.getRemoteAddress(),
exchange.getRequestURI());
ReaderMetricsProcessor mp = ReaderMetricsProcessor.getInstance();
if (mp == null) {
sendResponse(
exchange,
"{\"error\":\"Metrics Processor is not initialized. The reader has run into an issue or has just started.\"}",
HttpURLConnection.HTTP_UNAVAILABLE);
LOG.warn(
"Metrics Processor is not initialized. The reader has run into an issue or has just started.");
return;
}
Map.Entry<Long, MetricsDB> dbEntry = mp.getMetricsDB();
if (dbEntry == null) {
sendResponse(
exchange,
"{\"error\":\"There are no metrics databases. The reader has run into an issue or has just started.\"}",
HttpURLConnection.HTTP_UNAVAILABLE);
LOG.warn(
"There are no metrics databases. The reader has run into an issue or has just started.");
return;
}
MetricsDB db = dbEntry.getValue();
Long dbTimestamp = dbEntry.getKey();
if (requestMethod.equalsIgnoreCase("GET")) {
LOG.debug("Query handler called.");
if (isUnitLookUp(exchange)) {
getMetricUnits(exchange);
return;
}
Map<String, String> params = getParamsMap(exchange.getRequestURI().getQuery());
exchange.getResponseHeaders().set("Content-Type", "application/json");
try {
String nodes = params.get("nodes");
List<String> metricList = metricsRestUtil.parseArrayParam(params, "metrics", false);
List<String> aggList = metricsRestUtil.parseArrayParam(params, "agg", false);
List<String> dimList = metricsRestUtil.parseArrayParam(params, "dim", true);
if (metricList.size() != aggList.size()) {
sendResponse(
exchange,
"{\"error\":\"metrics/aggregations should have the same number of entries.\"}",
HttpURLConnection.HTTP_BAD_REQUEST);
return;
}
if (!validParams(exchange, metricList, dimList, aggList)) {
return;
}
String localResponse;
if (db != null) {
Result<Record> metricResult = db.queryMetric(metricList, aggList, dimList);
if (metricResult == null) {
localResponse = "{}";
} else {
localResponse = metricResult.formatJSON();
}
} else {
// Empty JSON.
localResponse = "{}";
}
String localResponseWithTimestamp =
String.format(
"{\"timestamp\": %d, \"data\": %s}", dbTimestamp, localResponse);
ConcurrentHashMap<String, String> nodeResponses = new ConcurrentHashMap<>();
final List<InstanceDetails> allNodes = appContext.getAllClusterInstances();
String localNodeId = "local";
if (allNodes.size() != 0) {
localNodeId = allNodes.get(0).getInstanceId().toString();
}
nodeResponses.put(localNodeId, localResponseWithTimestamp);
String response = metricsRestUtil.nodeJsonBuilder(nodeResponses);
if (nodes == null || !nodes.equals("all") || allNodes.size() <= 1) {
sendResponse(exchange, response, HttpURLConnection.HTTP_OK);
} else if (nodes.equals("all")) {
CountDownLatch doneSignal = new CountDownLatch(allNodes.size() - 1);
for (int i = 1; i < allNodes.size(); i++) {
InstanceDetails node = allNodes.get(i);
LOG.debug("Collecting remote stats");
try {
collectRemoteStats(
node, metricList, aggList, dimList, nodeResponses, doneSignal);
} catch (Exception e) {
LOG.error(
"Unable to collect stats for node, addr:{}, exception: {} ExceptionCode: {}",
node.getInstanceIp(),
e,
StatExceptionCode.REQUEST_REMOTE_ERROR.toString());
StatsCollector.instance()
.logException(StatExceptionCode.REQUEST_REMOTE_ERROR);
}
}
boolean completed = doneSignal.await(TIME_OUT_VALUE, TIME_OUT_UNIT);
if (!completed) {
LOG.debug("Timeout while collecting remote stats");
StatsCollector.instance()
.logException(StatExceptionCode.REQUEST_REMOTE_ERROR);
}
sendResponseWhenRequestCompleted(nodeResponses, exchange);
}
} catch (InvalidParameterException e) {
LOG.error("DB file path : {}", db.getDBFilePath());
LOG.error(
(Supplier<?>)
() ->
new ParameterizedMessage(
"QueryException {} ExceptionCode: {}.",
e.toString(),
StatExceptionCode.REQUEST_ERROR.toString()),
e);
StatsCollector.instance().logException(StatExceptionCode.REQUEST_ERROR);
String response = "{\"error\":\"" + e.getMessage() + "\"}";
sendResponse(exchange, response, HttpURLConnection.HTTP_BAD_REQUEST);
} catch (Exception e) {
LOG.error("DB file path : {}", db.getDBFilePath());
LOG.error(
(Supplier<?>)
() ->
new ParameterizedMessage(
"QueryException {} ExceptionCode: {}.",
e.toString(),
StatExceptionCode.REQUEST_ERROR.toString()),
e);
StatsCollector.instance().logException(StatExceptionCode.REQUEST_ERROR);
String response = "{\"error\":\"" + e.toString() + "\"}";
sendResponse(exchange, response, HttpURLConnection.HTTP_INTERNAL_ERROR);
}
} else {
exchange.sendResponseHeaders(HttpURLConnection.HTTP_NOT_FOUND, -1);
exchange.close();
}
}