public void process()

in src/main/java/co/elastic/support/monitoring/MonitoringImportProcessor.java [64:155]


    public void process(File file) {

        logger.info(Constants.CONSOLE, "Processing: {}", file.getName());
        long eventsWritten = 0;
        String indexName;

        try (InputStream instream = new FileInputStream(file)) {

            if (file.getName().contains("logstash")) {
                indexName = config.logstashExtractIndexPattern.replace("{{suffix}}", inputs.targetSuffix);
            } else if (file.getName().contains("metricbeat")) {
                indexName = config.metricbeatExtractIndexPattern.replace("{{suffix}}", inputs.targetSuffix);
            } else {
                indexName = config.monitoringExtractIndexPattern.replace("{{suffix}}", inputs.targetSuffix);
            }

            BufferedReader br = new BufferedReader(new InputStreamReader(instream));
            StringBuilder batchBuilder = new StringBuilder();
            String contents;
            int batch = 0;
            Map<String, String> inputMeta = new LinkedHashMap<>();
            Map<String, Map> inputAction = new LinkedHashMap();

            try {
                // Pre-fetching to determine source version
                contents = br.readLine();
                ObjectNode sourceObject = JsonYamlUtils.mapper.readValue(contents, ObjectNode.class);
                if (StringUtils.isEmpty(sourceObject.path("type").asText())) {
                    indexName = indexName.replace("{{version}}", "8");
                    logger.info(Constants.CONSOLE, "Targeting {} because no type field is found", indexName);
                    inputMeta.put("_index", indexName);
                    inputAction.put("create", inputMeta);
                } else {
                    indexName = indexName.replace("{{version}}", "7");
                    logger.info(Constants.CONSOLE, "Targeting {} because type field is found", indexName);
                    inputMeta.put("_index", indexName);
                    inputAction.put("index", inputMeta);
                }

                String actionLine = JsonYamlUtils.mapper.writeValueAsString(inputAction);

                while (contents != null) {
                    // If clustername is present and they changed it, update
                    sourceObject = JsonYamlUtils.mapper.readValue(contents, ObjectNode.class);
                    String clusterName = sourceObject.path("cluster_name").asText();

                    if (updateClusterName && StringUtils.isNotEmpty(clusterName)) {
                        sourceObject.put("cluster_name", newClusterName);
                    }

                    String altClusterName = sourceObject.path("cluster_settings").path("cluster").path("metadata").path("display_name").asText();
                    if (updateClusterName && StringUtils.isNotEmpty(altClusterName)) {
                        sourceObject.with("cluster_settings").with("cluster").with("metadata").put("display_name", newClusterName);
                    }

                    String sourceLine = JsonYamlUtils.mapper.writeValueAsString(sourceObject);

                    batchBuilder.append(actionLine + "\n");
                    batchBuilder.append(sourceLine + "\n");

                    // See if we need to
                    if (batch >= config.bulkSize) {
                        logger.info(Constants.CONSOLE, "Indexing document batch {} to {}", eventsWritten, eventsWritten + batch);

                        long docsWritten = writeBatch(batchBuilder.toString(), batch);
                        eventsWritten += docsWritten;
                        batch = 0;
                        batchBuilder.setLength(0);
                    } else {
                        batch++;
                    }
                    contents = br.readLine();
                }

                // if there's anything left do the cleanup
                if (batch > 0) {
                    logger.info(Constants.CONSOLE, "Indexing document batch {} to {}", eventsWritten, eventsWritten + batch);
                    long docsWritten = writeBatch(batchBuilder.toString(), batch);
                    eventsWritten += docsWritten;
                }

            } catch (Throwable t) {
                // If something goes wrong just log it and keep boing.
                logger.error(Constants.CONSOLE, "Error processing JSON event for {}.", t);
            }
        } catch (Throwable t) {
            logger.error(Constants.CONSOLE, "Error processing entry - stream related error,", t);
        } finally {
            logger.info(Constants.CONSOLE, "{} events written from {}", eventsWritten, file.getName());
        }

    }