in src/main/java/org/opensearch/performanceanalyzer/rest/QueryBatchRequestHandler.java [89:263]
public void handle(HttpExchange exchange) throws IOException {
String requestMethod = exchange.getRequestMethod();
if (!requestMethod.equalsIgnoreCase("GET")) {
exchange.sendResponseHeaders(HttpURLConnection.HTTP_NOT_FOUND, -1);
exchange.close();
return;
}
ReaderMetricsProcessor mp = ReaderMetricsProcessor.getInstance();
if (mp == null) {
sendResponse(
exchange,
"{\"error\":\"Metrics Processor is not initialized. The reader has run into an issue or has just started.\"}",
HttpURLConnection.HTTP_UNAVAILABLE);
LOG.warn(
"Metrics Processor is not initialized. The reader has run into an issue or has just started.");
return;
}
NavigableSet<Long> batchMetrics = mp.getBatchMetrics();
long currentTime = System.currentTimeMillis();
if (batchMetrics == null) {
sendResponse(
exchange,
"{\"error\":\"The batch metrics api has not been enabled for this node.\"}",
HttpURLConnection.HTTP_UNAVAILABLE);
LOG.warn("The batch metrics api has not been enabled for this node.");
return;
}
if (batchMetrics.isEmpty()) {
sendResponse(
exchange,
"{\"error\":\"There are no metrics databases. The reader has run into an issue or has just started.\"}",
HttpURLConnection.HTTP_UNAVAILABLE);
LOG.warn(
"There are no metrics databases. The reader has run into an issue or has just started.");
return;
}
exchange.getResponseHeaders().set("Content-Type", "application/json");
Map<String, String> params = getParamsMap(exchange.getRequestURI().getQuery());
try {
// Parse and validate parameters
String[] validParamsTmp = {"", "metrics", "starttime", "endtime", "samplingperiod"};
Set<String> validParams = new HashSet<>(Arrays.asList(validParamsTmp));
for (String param : params.keySet()) {
if (!validParams.contains(param)) {
throw new InvalidParameterException(
String.format("%s is an invalid parameter", param));
}
}
List<String> metrics = metricsRestUtil.parseArrayParam(params, "metrics", false);
String startTimeParam = params.get("starttime");
String endTimeParam = params.get("endtime");
String samplingPeriodParam = params.get("samplingperiod");
for (String metric : metrics) {
if (!MetricsModel.ALL_METRICS.containsKey(metric)) {
throw new InvalidParameterException(
String.format("%s is an invalid metric", metric));
}
}
if (startTimeParam == null || startTimeParam.isEmpty()) {
throw new InvalidParameterException("starttime parameter must be set");
}
long startTime;
try {
startTime = Long.parseUnsignedLong(startTimeParam);
} catch (NumberFormatException e) {
throw new InvalidParameterException(
String.format("%s is an invalid starttime", startTimeParam));
}
if (endTimeParam == null || endTimeParam.isEmpty()) {
throw new InvalidParameterException("endtime parameter must be set");
}
long endTime;
try {
endTime = Long.parseUnsignedLong(endTimeParam);
} catch (NumberFormatException e) {
throw new InvalidParameterException(
String.format("%s is an invalid endtime", endTimeParam));
}
long samplingPeriod = DEFAULT_SAMPLING_PERIOD_MILLIS;
if (samplingPeriodParam != null && !samplingPeriodParam.isEmpty()) {
samplingPeriod = Long.parseLong(samplingPeriodParam);
if (samplingPeriod < 5 || samplingPeriod % 5 != 0) {
throw new InvalidParameterException(
String.format("%s is an invalid sampling period", samplingPeriodParam));
}
if (samplingPeriod
>= PluginSettings.instance().getBatchMetricsRetentionPeriodMinutes() * 60) {
throw new InvalidParameterException(
"sampling period must be less than the retention period");
}
samplingPeriod *= 1000;
}
if (startTime >= endTime) {
throw new InvalidParameterException("starttime must be less than the endtime");
}
startTime -= startTime % samplingPeriod;
endTime -= endTime % samplingPeriod;
if (startTime == endTime) {
throw new InvalidParameterException(
"starttime and endtime must be at least one sampling period apart");
}
if (endTime > currentTime) {
throw new InvalidParameterException(
"endtime can be no greater than the system time at the node");
}
if (startTime
< currentTime
- PluginSettings.instance().getBatchMetricsRetentionPeriodMinutes()
* 60
* 1000) {
throw new InvalidParameterException(
"starttime must be within the retention period");
}
long processingStartTime = System.currentTimeMillis();
String queryResponse =
queryFromBatchMetrics(
batchMetrics,
metrics,
startTime,
endTime,
samplingPeriod,
DEFAULT_MAX_DATAPOINTS);
PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.BATCH_METRICS_QUERY_PROCESSING_TIME,
"",
System.currentTimeMillis() - processingStartTime);
PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.BATCH_METRICS_HTTP_SUCCESS, "", 1);
sendResponse(exchange, queryResponse, HttpURLConnection.HTTP_OK);
} catch (DataAccessException e) {
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.READER_METRICSDB_ACCESS_ERRORS, "", 1);
LOG.error(
"QueryException {} ExceptionCode: {}.",
e,
ReaderMetrics.BATCH_METRICS_HTTP_HOST_ERROR,
e);
PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.BATCH_METRICS_HTTP_HOST_ERROR, "", 1);
String response = "{\"error\":\"" + e.toString() + "\"}";
sendResponse(exchange, response, HttpURLConnection.HTTP_INTERNAL_ERROR);
} catch (InvalidParameterException e) {
LOG.error(
"QueryException {} ExceptionCode: {}.",
e,
ReaderMetrics.BATCH_METRICS_HTTP_CLIENT_ERROR,
e);
PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.BATCH_METRICS_HTTP_CLIENT_ERROR, "", 1);
String response = "{\"error\":\"" + e.getMessage() + ".\"}";
sendResponse(exchange, response, HttpURLConnection.HTTP_BAD_REQUEST);
} catch (Exception e) {
LOG.error(
"QueryException {} ExceptionCode: {}.",
e,
ReaderMetrics.BATCH_METRICS_HTTP_HOST_ERROR,
e);
PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.BATCH_METRICS_HTTP_HOST_ERROR, "", 1);
String response = "{\"error\":\"" + e.toString() + "\"}";
sendResponse(exchange, response, HttpURLConnection.HTTP_INTERNAL_ERROR);
}
}