in flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java [354:379]
public void start() {
ElasticSearchClientFactory clientFactory = new ElasticSearchClientFactory();
logger.info("ElasticSearch sink {} started");
sinkCounter.start();
try {
if (isLocal) {
client = clientFactory.getLocalClient(
clientType, eventSerializer, indexRequestFactory);
} else {
client = clientFactory.getClient(clientType, serverAddresses,
clusterName, eventSerializer, indexRequestFactory);
client.configure(elasticSearchClientContext);
}
sinkCounter.incrementConnectionCreatedCount();
} catch (Exception ex) {
ex.printStackTrace();
sinkCounter.incrementConnectionFailedCount();
if (client != null) {
client.close();
sinkCounter.incrementConnectionClosedCount();
}
}
super.start();
}