in src/main/java/co/elastic/support/monitoring/MonitoringExportService.java [222:324]
private void runExportQueries(String tempDir, RestClient client, MonitoringExportConfig config,
MonitoringExportInputs inputs, Map<String, RestEntry> restCalls) {
// Get the monitoring stats labels and the general query.
List<String> statsFields = config.getStatsByType(inputs.type);
String monitoringScroll = Long.toString(config.monitoringScrollSize);
String monitoringStartUri = restCalls.get("monitoring-start-scroll-uri").getUrl();
String metricbeatStartUri = restCalls.get("metricbeat-start-scroll-uri").getUrl();
String monitoringScrollUri = restCalls.get("monitoring-scroll-uri").getUrl();
for (String stat : statsFields) {
String startUri;
logger.info(Constants.CONSOLE, "Now extracting {}...", stat);
String statFile;
String query;
if (stat.equalsIgnoreCase("index_stats")) {
query = config.queries.get("index_stats");
startUri = monitoringStartUri.replace("{{type}}", "es");
statFile = tempDir + SystemProperties.fileSeparator + stat + ".json";
} else if (config.logstashSets.contains(stat)) {
query = config.queries.get("general");
startUri = monitoringStartUri.replace("{{type}}", "logstash");
statFile = tempDir + SystemProperties.fileSeparator + stat + ".json";
} else if (config.metricSets.contains(stat)) {
query = config.queries.get("metricbeat");
startUri = metricbeatStartUri;
statFile = tempDir + SystemProperties.fileSeparator + "metricbeat-" + stat + ".json";
} else {
query = config.queries.get("general");
startUri = monitoringStartUri.replace("{{type}}", "es");
statFile = tempDir + SystemProperties.fileSeparator + stat + ".json";
}
String field = stat.equalsIgnoreCase("shards") ? "shard" : stat;
query = query.replace("{{type}}", stat);
query = query.replace("{{field}}", field);
query = query.replace("{{size}}", monitoringScroll);
query = query.replace("{{start}}", inputs.queryStartDate);
query = query.replace("{{stop}}", inputs.queryEndDate);
query = query.replace("{{clusterId}}", inputs.clusterId);
PrintWriter pw = null;
try {
RestResult restResult = new RestResult(client.execPost(startUri, query), startUri);
if (restResult.getStatus() != 200) {
logger.error(Constants.CONSOLE,
"Initial retrieve for stat: {} failed with status: {}, reason: {}, bypassing and going to next call.",
stat, restResult.getStatus(), restResult.getReason());
logger.error(Constants.CONSOLE, "Bypassing.");
continue;
}
JsonNode resultNode = JsonYamlUtils.createJsonNodeFromString(restResult.toString());
long totalHits = resultNode.path("hits").path("total").asLong(0);
// If there are no hits, move to the next.
if (totalHits > 0) {
logger.info(Constants.CONSOLE, "{} documents retrieved. Writing to disk.", totalHits);
pw = new PrintWriter(statFile);
} else {
logger.info(Constants.CONSOLE, "No documents found for: {}.", stat);
continue;
}
ArrayNode hitsNode = getHitsArray(resultNode);
long hitsCount = hitsNode.size();
long processedHits = 0;
String scrollId;
do {
// We may have multiple scrolls coming back so process the first one.
processHits(hitsNode, pw);
processedHits += hitsNode.size();
logger.info(Constants.CONSOLE, "{} of {} processed.", processedHits, totalHits);
scrollId = resultNode.path("_scroll_id").asText();
String scrollQuery = SCROLL_ID.replace("{{scrollId}}", scrollId);
RestResult scrollResult = new RestResult(client.execPost(monitoringScrollUri, scrollQuery),
monitoringScrollUri);
if (restResult.getStatus() == 200) {
resultNode = JsonYamlUtils.createJsonNodeFromString(scrollResult.toString());
hitsNode = getHitsArray(resultNode);
hitsCount = hitsNode.size();
} else {
logger.error(Constants.CONSOLE,
"Scroll for stat: {} Operation failed with status: {}, reason: {}, bypassing and going to next call.",
stat, restResult.getStatus(), restResult.getReason());
}
} while (hitsCount != 0);
// Delete the scroll to free up the resources
client.execDelete("/_search/scroll/" + scrollId);
} catch (Exception e) {
logger.error("Error extracting information from {}", stat, e);
} finally {
if (pw != null) {
pw.close();
}
}
}
}