in flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java [117:148]
public void execute() throws Exception {
int statusCode = 0, triesCount = 0;
HttpResponse response = null;
String entity;
synchronized (bulkBuilder) {
entity = bulkBuilder.toString();
bulkBuilder = new StringBuilder();
}
while (statusCode != HttpStatus.SC_OK && triesCount < serversList.size()) {
triesCount++;
String host = serversList.get();
String url = host + "/" + BULK_ENDPOINT;
HttpPost httpRequest = new HttpPost(url);
httpRequest.setEntity(new StringEntity(entity));
response = httpClient.execute(httpRequest);
statusCode = response.getStatusLine().getStatusCode();
logger.info("Status code from elasticsearch: " + statusCode);
if (response.getEntity() != null) {
logger.debug("Status message from elasticsearch: " +
EntityUtils.toString(response.getEntity(), "UTF-8"));
}
}
if (statusCode != HttpStatus.SC_OK) {
if (response.getEntity() != null) {
throw new EventDeliveryException(EntityUtils.toString(response.getEntity(), "UTF-8"));
} else {
throw new EventDeliveryException("Elasticsearch status code was: " + statusCode);
}
}
}