in mr/src/main/java/org/elasticsearch/hadoop/handler/impl/elasticsearch/ElasticsearchHandler.java [117:192]
public void init(Properties properties) {
// Collect Handler settings
Settings handlerSettings = new PropertiesSettings(properties);
boolean inheritRoot = true;
if (handlerSettings.getProperty(CONF_CLIENT_INHERIT) != null) {
inheritRoot = Booleans.parseBoolean(handlerSettings.getProperty(CONF_CLIENT_INHERIT));
}
// Exception: Should persist transport pooling key if it is preset in the root config, regardless of the inherit settings.
if (SettingsUtils.hasJobTransportPoolingKey(rootSettings)) {
String jobKey = SettingsUtils.getJobTransportPoolingKey(rootSettings);
// We want to use a different job key based on the current one since error handlers might write
// to other clusters or have seriously different settings from the current rest client.
String newJobKey = jobKey + "_" + UUID.randomUUID().toString();
// Place under the client configuration for the handler
handlerSettings.setProperty(CONF_CLIENT_CONF + "." + InternalConfigurationOptions.INTERNAL_TRANSPORT_POOLING_KEY, newJobKey);
}
// Gather high level configs and push to client conf level
resolveProperty(CONF_CLIENT_NODES, CONF_CLIENT_CONF + "." + ConfigurationOptions.ES_NODES, handlerSettings);
resolveProperty(CONF_CLIENT_PORT, CONF_CLIENT_CONF + "." + ConfigurationOptions.ES_PORT, handlerSettings);
resolveProperty(CONF_CLIENT_RESOURCE, CONF_CLIENT_CONF + "." + ConfigurationOptions.ES_RESOURCE_WRITE, handlerSettings);
resolveProperty(CONF_CLIENT_RESOURCE, CONF_CLIENT_CONF + "." + ConfigurationOptions.ES_RESOURCE, handlerSettings);
// Inherit the original configuration or not
this.clientSettings = handlerSettings.getSettingsView(CONF_CLIENT_CONF);
// Ensure we have a write resource to use
Assert.hasText(clientSettings.getResourceWrite(), "Could not locate write resource for ES error handler.");
if (inheritRoot) {
LOG.info("Elasticsearch Error Handler inheriting root configuration");
this.clientSettings = new CompositeSettings(Arrays.asList(clientSettings, rootSettings.excludeFilter("es.internal")));
} else {
LOG.info("Elasticsearch Error Handler proceeding without inheriting root configuration options as configured");
}
// Ensure no pattern in Index format, and extract the index to send errors to
InitializationUtils.discoverAndValidateClusterInfo(clientSettings, LOG);
Resource resource = new Resource(clientSettings, false);
IndexExtractor iformat = ObjectUtils.instantiate(clientSettings.getMappingIndexExtractorClassName(), handlerSettings);
iformat.compile(resource.toString());
if (iformat.hasPattern()) {
throw new IllegalArgumentException(String.format("Cannot use index format within Elasticsearch Error Handler. Format was [%s]", resource.toString()));
}
this.endpoint = resource;
// Configure ECS
ElasticCommonSchema schema = new ElasticCommonSchema();
TemplateBuilder templateBuilder = schema.buildTemplate()
.setEventCategory(CONST_EVENT_CATEGORY);
// Add any Labels and Tags to schema
for (Map.Entry entry: handlerSettings.getSettingsView(CONF_LABEL).asProperties().entrySet()) {
templateBuilder.addLabel(entry.getKey().toString(), entry.getValue().toString());
}
templateBuilder.addTags(StringUtils.tokenize(handlerSettings.getProperty(CONF_TAGS)));
// Configure template using event handler
templateBuilder = eventConverter.configureTemplate(templateBuilder);
this.messageTemplate = templateBuilder.build();
// Determine the behavior for successful write and error on write:
this.returnDefault = HandlerResult.valueOf(handlerSettings.getProperty(CONF_RETURN_VALUE, CONF_RETURN_VALUE_DEFAULT));
if (HandlerResult.PASS == returnDefault) {
this.successReason = handlerSettings.getProperty(CONF_RETURN_VALUE + "." + CONF_PASS_REASON_SUFFIX);
} else {
this.successReason = null;
}
this.returnError = HandlerResult.valueOf(handlerSettings.getProperty(CONF_RETURN_ERROR, CONF_RETURN_ERROR_DEFAULT));
if (HandlerResult.PASS == returnError) {
this.errorReason = handlerSettings.getProperty(CONF_RETURN_ERROR + "." + CONF_PASS_REASON_SUFFIX);
} else {
this.errorReason = null;
}
}