public void handle()

in src/main/java/org/opensearch/performanceanalyzer/rest/QueryMetricsRequestHandler.java [88:235]


    public void handle(HttpExchange exchange) throws IOException {
        String requestMethod = exchange.getRequestMethod();
        LOG.info(
                "{} {} {}",
                exchange.getRequestMethod(),
                exchange.getRemoteAddress(),
                exchange.getRequestURI());
        ReaderMetricsProcessor mp = ReaderMetricsProcessor.getInstance();
        if (mp == null) {
            sendResponse(
                    exchange,
                    "{\"error\":\"Metrics Processor is not initialized. The reader has run into an issue or has just started.\"}",
                    HttpURLConnection.HTTP_UNAVAILABLE);

            LOG.warn(
                    "Metrics Processor is not initialized. The reader has run into an issue or has just started.");
            return;
        }

        Map.Entry<Long, MetricsDB> dbEntry = mp.getMetricsDB();
        if (dbEntry == null) {
            sendResponse(
                    exchange,
                    "{\"error\":\"There are no metrics databases. The reader has run into an issue or has just started.\"}",
                    HttpURLConnection.HTTP_UNAVAILABLE);

            LOG.warn(
                    "There are no metrics databases. The reader has run into an issue or has just started.");
            return;
        }
        MetricsDB db = dbEntry.getValue();
        Long dbTimestamp = dbEntry.getKey();

        if (requestMethod.equalsIgnoreCase("GET")) {
            LOG.debug("Query handler called.");

            if (isUnitLookUp(exchange)) {
                getMetricUnits(exchange);
                return;
            }

            Map<String, String> params = getParamsMap(exchange.getRequestURI().getQuery());

            exchange.getResponseHeaders().set("Content-Type", "application/json");
            try {

                String nodes = params.get("nodes");
                List<String> metricList = metricsRestUtil.parseArrayParam(params, "metrics", false);
                List<String> aggList = metricsRestUtil.parseArrayParam(params, "agg", false);
                List<String> dimList = metricsRestUtil.parseArrayParam(params, "dim", true);

                if (metricList.size() != aggList.size()) {
                    sendResponse(
                            exchange,
                            "{\"error\":\"metrics/aggregations should have the same number of entries.\"}",
                            HttpURLConnection.HTTP_BAD_REQUEST);
                    return;
                }

                if (!validParams(exchange, metricList, dimList, aggList)) {
                    return;
                }

                String localResponse;
                if (db != null) {
                    Result<Record> metricResult = db.queryMetric(metricList, aggList, dimList);
                    if (metricResult == null) {
                        localResponse = "{}";
                    } else {
                        localResponse = metricResult.formatJSON();
                    }
                } else {
                    // Empty JSON.
                    localResponse = "{}";
                }

                String localResponseWithTimestamp =
                        String.format(
                                "{\"timestamp\": %d, \"data\": %s}", dbTimestamp, localResponse);
                ConcurrentHashMap<String, String> nodeResponses = new ConcurrentHashMap<>();
                final List<InstanceDetails> allNodes = appContext.getAllClusterInstances();
                String localNodeId = "local";
                if (allNodes.size() != 0) {
                    localNodeId = allNodes.get(0).getInstanceId().toString();
                }
                nodeResponses.put(localNodeId, localResponseWithTimestamp);
                String response = metricsRestUtil.nodeJsonBuilder(nodeResponses);

                if (nodes == null || !nodes.equals("all") || allNodes.size() <= 1) {
                    sendResponse(exchange, response, HttpURLConnection.HTTP_OK);
                } else if (nodes.equals("all")) {
                    CountDownLatch doneSignal = new CountDownLatch(allNodes.size() - 1);
                    for (int i = 1; i < allNodes.size(); i++) {
                        InstanceDetails node = allNodes.get(i);
                        LOG.debug("Collecting remote stats");
                        try {
                            collectRemoteStats(
                                    node, metricList, aggList, dimList, nodeResponses, doneSignal);
                        } catch (Exception e) {
                            LOG.error(
                                    "Unable to collect stats for node, addr:{}, exception: {} ExceptionCode: {}",
                                    node.getInstanceIp(),
                                    e,
                                    StatExceptionCode.REQUEST_REMOTE_ERROR.toString());
                            StatsCollector.instance()
                                    .logException(StatExceptionCode.REQUEST_REMOTE_ERROR);
                        }
                    }
                    boolean completed = doneSignal.await(TIME_OUT_VALUE, TIME_OUT_UNIT);
                    if (!completed) {
                        LOG.debug("Timeout while collecting remote stats");
                        StatsCollector.instance()
                                .logException(StatExceptionCode.REQUEST_REMOTE_ERROR);
                    }
                    sendResponseWhenRequestCompleted(nodeResponses, exchange);
                }
            } catch (InvalidParameterException e) {
                LOG.error("DB file path : {}", db.getDBFilePath());
                LOG.error(
                        (Supplier<?>)
                                () ->
                                        new ParameterizedMessage(
                                                "QueryException {} ExceptionCode: {}.",
                                                e.toString(),
                                                StatExceptionCode.REQUEST_ERROR.toString()),
                        e);
                StatsCollector.instance().logException(StatExceptionCode.REQUEST_ERROR);
                String response = "{\"error\":\"" + e.getMessage() + "\"}";
                sendResponse(exchange, response, HttpURLConnection.HTTP_BAD_REQUEST);
            } catch (Exception e) {
                LOG.error("DB file path : {}", db.getDBFilePath());
                LOG.error(
                        (Supplier<?>)
                                () ->
                                        new ParameterizedMessage(
                                                "QueryException {} ExceptionCode: {}.",
                                                e.toString(),
                                                StatExceptionCode.REQUEST_ERROR.toString()),
                        e);
                StatsCollector.instance().logException(StatExceptionCode.REQUEST_ERROR);
                String response = "{\"error\":\"" + e.toString() + "\"}";
                sendResponse(exchange, response, HttpURLConnection.HTTP_INTERNAL_ERROR);
            }
        } else {
            exchange.sendResponseHeaders(HttpURLConnection.HTTP_NOT_FOUND, -1);
            exchange.close();
        }
    }