in src/main/java/co/elastic/support/monitoring/MonitoringImportProcessor.java [64:155]
public void process(File file) {
logger.info(Constants.CONSOLE, "Processing: {}", file.getName());
long eventsWritten = 0;
String indexName;
try (InputStream instream = new FileInputStream(file)) {
if (file.getName().contains("logstash")) {
indexName = config.logstashExtractIndexPattern.replace("{{suffix}}", inputs.targetSuffix);
} else if (file.getName().contains("metricbeat")) {
indexName = config.metricbeatExtractIndexPattern.replace("{{suffix}}", inputs.targetSuffix);
} else {
indexName = config.monitoringExtractIndexPattern.replace("{{suffix}}", inputs.targetSuffix);
}
BufferedReader br = new BufferedReader(new InputStreamReader(instream));
StringBuilder batchBuilder = new StringBuilder();
String contents;
int batch = 0;
Map<String, String> inputMeta = new LinkedHashMap<>();
Map<String, Map> inputAction = new LinkedHashMap();
try {
// Pre-fetching to determine source version
contents = br.readLine();
ObjectNode sourceObject = JsonYamlUtils.mapper.readValue(contents, ObjectNode.class);
if (StringUtils.isEmpty(sourceObject.path("type").asText())) {
indexName = indexName.replace("{{version}}", "8");
logger.info(Constants.CONSOLE, "Targeting {} because no type field is found", indexName);
inputMeta.put("_index", indexName);
inputAction.put("create", inputMeta);
} else {
indexName = indexName.replace("{{version}}", "7");
logger.info(Constants.CONSOLE, "Targeting {} because type field is found", indexName);
inputMeta.put("_index", indexName);
inputAction.put("index", inputMeta);
}
String actionLine = JsonYamlUtils.mapper.writeValueAsString(inputAction);
while (contents != null) {
// If clustername is present and they changed it, update
sourceObject = JsonYamlUtils.mapper.readValue(contents, ObjectNode.class);
String clusterName = sourceObject.path("cluster_name").asText();
if (updateClusterName && StringUtils.isNotEmpty(clusterName)) {
sourceObject.put("cluster_name", newClusterName);
}
String altClusterName = sourceObject.path("cluster_settings").path("cluster").path("metadata").path("display_name").asText();
if (updateClusterName && StringUtils.isNotEmpty(altClusterName)) {
sourceObject.with("cluster_settings").with("cluster").with("metadata").put("display_name", newClusterName);
}
String sourceLine = JsonYamlUtils.mapper.writeValueAsString(sourceObject);
batchBuilder.append(actionLine + "\n");
batchBuilder.append(sourceLine + "\n");
// See if we need to
if (batch >= config.bulkSize) {
logger.info(Constants.CONSOLE, "Indexing document batch {} to {}", eventsWritten, eventsWritten + batch);
long docsWritten = writeBatch(batchBuilder.toString(), batch);
eventsWritten += docsWritten;
batch = 0;
batchBuilder.setLength(0);
} else {
batch++;
}
contents = br.readLine();
}
// if there's anything left do the cleanup
if (batch > 0) {
logger.info(Constants.CONSOLE, "Indexing document batch {} to {}", eventsWritten, eventsWritten + batch);
long docsWritten = writeBatch(batchBuilder.toString(), batch);
eventsWritten += docsWritten;
}
} catch (Throwable t) {
// If something goes wrong just log it and keep boing.
logger.error(Constants.CONSOLE, "Error processing JSON event for {}.", t);
}
} catch (Throwable t) {
logger.error(Constants.CONSOLE, "Error processing entry - stream related error,", t);
} finally {
logger.info(Constants.CONSOLE, "{} events written from {}", eventsWritten, file.getName());
}
}