public void init()

in mr/src/main/java/org/elasticsearch/hadoop/handler/impl/elasticsearch/ElasticsearchHandler.java [117:192]


    public void init(Properties properties) {
        // Collect Handler settings
        Settings handlerSettings = new PropertiesSettings(properties);
        boolean inheritRoot = true;
        if (handlerSettings.getProperty(CONF_CLIENT_INHERIT) != null) {
            inheritRoot = Booleans.parseBoolean(handlerSettings.getProperty(CONF_CLIENT_INHERIT));
        }

        // Exception: Should persist transport pooling key if it is preset in the root config, regardless of the inherit settings.
        if (SettingsUtils.hasJobTransportPoolingKey(rootSettings)) {
            String jobKey = SettingsUtils.getJobTransportPoolingKey(rootSettings);
            // We want to use a different job key based on the current one since error handlers might write
            // to other clusters or have seriously different settings from the current rest client.
            String newJobKey = jobKey + "_" + UUID.randomUUID().toString();
            // Place under the client configuration for the handler
            handlerSettings.setProperty(CONF_CLIENT_CONF + "." + InternalConfigurationOptions.INTERNAL_TRANSPORT_POOLING_KEY, newJobKey);
        }

        // Gather high level configs and push to client conf level
        resolveProperty(CONF_CLIENT_NODES, CONF_CLIENT_CONF + "." + ConfigurationOptions.ES_NODES, handlerSettings);
        resolveProperty(CONF_CLIENT_PORT, CONF_CLIENT_CONF + "." + ConfigurationOptions.ES_PORT, handlerSettings);
        resolveProperty(CONF_CLIENT_RESOURCE, CONF_CLIENT_CONF + "." + ConfigurationOptions.ES_RESOURCE_WRITE, handlerSettings);
        resolveProperty(CONF_CLIENT_RESOURCE, CONF_CLIENT_CONF + "." + ConfigurationOptions.ES_RESOURCE, handlerSettings);

        // Inherit the original configuration or not
        this.clientSettings = handlerSettings.getSettingsView(CONF_CLIENT_CONF);

        // Ensure we have a write resource to use
        Assert.hasText(clientSettings.getResourceWrite(), "Could not locate write resource for ES error handler.");

        if (inheritRoot) {
            LOG.info("Elasticsearch Error Handler inheriting root configuration");
            this.clientSettings = new CompositeSettings(Arrays.asList(clientSettings, rootSettings.excludeFilter("es.internal")));
        } else {
            LOG.info("Elasticsearch Error Handler proceeding without inheriting root configuration options as configured");
        }

        // Ensure no pattern in Index format, and extract the index to send errors to
        InitializationUtils.discoverAndValidateClusterInfo(clientSettings, LOG);
        Resource resource = new Resource(clientSettings, false);
        IndexExtractor iformat = ObjectUtils.instantiate(clientSettings.getMappingIndexExtractorClassName(), handlerSettings);
        iformat.compile(resource.toString());
        if (iformat.hasPattern()) {
            throw new IllegalArgumentException(String.format("Cannot use index format within Elasticsearch Error Handler. Format was [%s]", resource.toString()));
        }
        this.endpoint = resource;

        // Configure ECS
        ElasticCommonSchema schema = new ElasticCommonSchema();
        TemplateBuilder templateBuilder = schema.buildTemplate()
                .setEventCategory(CONST_EVENT_CATEGORY);

        // Add any Labels and Tags to schema
        for (Map.Entry entry: handlerSettings.getSettingsView(CONF_LABEL).asProperties().entrySet()) {
            templateBuilder.addLabel(entry.getKey().toString(), entry.getValue().toString());
        }
        templateBuilder.addTags(StringUtils.tokenize(handlerSettings.getProperty(CONF_TAGS)));

        // Configure template using event handler
        templateBuilder = eventConverter.configureTemplate(templateBuilder);
        this.messageTemplate = templateBuilder.build();

        // Determine the behavior for successful write and error on write:
        this.returnDefault = HandlerResult.valueOf(handlerSettings.getProperty(CONF_RETURN_VALUE, CONF_RETURN_VALUE_DEFAULT));
        if (HandlerResult.PASS == returnDefault) {
            this.successReason = handlerSettings.getProperty(CONF_RETURN_VALUE + "." + CONF_PASS_REASON_SUFFIX);
        } else {
            this.successReason = null;
        }
        this.returnError = HandlerResult.valueOf(handlerSettings.getProperty(CONF_RETURN_ERROR, CONF_RETURN_ERROR_DEFAULT));
        if (HandlerResult.PASS == returnError) {
            this.errorReason = handlerSettings.getProperty(CONF_RETURN_ERROR + "." + CONF_PASS_REASON_SUFFIX);
        } else {
            this.errorReason = null;
        }
    }