in flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java [244:351]
public void configure(Context context) {
if (!isLocal) {
if (StringUtils.isNotBlank(context.getString(HOSTNAMES))) {
serverAddresses = StringUtils.deleteWhitespace(
context.getString(HOSTNAMES)).split(",");
}
Preconditions.checkState(serverAddresses != null
&& serverAddresses.length > 0, "Missing Param:" + HOSTNAMES);
}
if (StringUtils.isNotBlank(context.getString(INDEX_NAME))) {
this.indexName = context.getString(INDEX_NAME);
}
if (StringUtils.isNotBlank(context.getString(INDEX_TYPE))) {
this.indexType = context.getString(INDEX_TYPE);
}
if (StringUtils.isNotBlank(context.getString(CLUSTER_NAME))) {
this.clusterName = context.getString(CLUSTER_NAME);
}
if (StringUtils.isNotBlank(context.getString(BATCH_SIZE))) {
this.batchSize = Integer.parseInt(context.getString(BATCH_SIZE));
}
if (StringUtils.isNotBlank(context.getString(TTL))) {
this.ttlMs = parseTTL(context.getString(TTL));
Preconditions.checkState(ttlMs > 0, TTL
+ " must be greater than 0 or not set.");
}
if (StringUtils.isNotBlank(context.getString(CLIENT_TYPE))) {
clientType = context.getString(CLIENT_TYPE);
}
elasticSearchClientContext = new Context();
elasticSearchClientContext.putAll(context.getSubProperties(CLIENT_PREFIX));
String serializerClazz = DEFAULT_SERIALIZER_CLASS;
if (StringUtils.isNotBlank(context.getString(SERIALIZER))) {
serializerClazz = context.getString(SERIALIZER);
}
Context serializerContext = new Context();
serializerContext.putAll(context.getSubProperties(SERIALIZER_PREFIX));
try {
@SuppressWarnings("unchecked")
Class<? extends Configurable> clazz = (Class<? extends Configurable>) Class
.forName(serializerClazz);
Configurable serializer = clazz.newInstance();
if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) {
indexRequestFactory
= (ElasticSearchIndexRequestBuilderFactory) serializer;
indexRequestFactory.configure(serializerContext);
} else if (serializer instanceof ElasticSearchEventSerializer) {
eventSerializer = (ElasticSearchEventSerializer) serializer;
eventSerializer.configure(serializerContext);
} else {
throw new IllegalArgumentException(serializerClazz
+ " is not an ElasticSearchEventSerializer");
}
} catch (Exception e) {
logger.error("Could not instantiate event serializer.", e);
Throwables.propagate(e);
}
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
String indexNameBuilderClass = DEFAULT_INDEX_NAME_BUILDER_CLASS;
if (StringUtils.isNotBlank(context.getString(INDEX_NAME_BUILDER))) {
indexNameBuilderClass = context.getString(INDEX_NAME_BUILDER);
}
Context indexnameBuilderContext = new Context();
serializerContext.putAll(
context.getSubProperties(INDEX_NAME_BUILDER_PREFIX));
try {
@SuppressWarnings("unchecked")
Class<? extends IndexNameBuilder> clazz
= (Class<? extends IndexNameBuilder>) Class
.forName(indexNameBuilderClass);
indexNameBuilder = clazz.newInstance();
indexnameBuilderContext.put(INDEX_NAME, indexName);
indexNameBuilder.configure(indexnameBuilderContext);
} catch (Exception e) {
logger.error("Could not instantiate index name builder.", e);
Throwables.propagate(e);
}
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
Preconditions.checkState(StringUtils.isNotBlank(indexName),
"Missing Param:" + INDEX_NAME);
Preconditions.checkState(StringUtils.isNotBlank(indexType),
"Missing Param:" + INDEX_TYPE);
Preconditions.checkState(StringUtils.isNotBlank(clusterName),
"Missing Param:" + CLUSTER_NAME);
Preconditions.checkState(batchSize >= 1, BATCH_SIZE
+ " must be greater than 0");
}