in core/src/main/java/org/apache/sdap/mudrod/tools/EONETIngester.java [111:148]
private String executeBulkIndexRequest(MudrodEngine mEngine, ESDriver esDriver, JsonArray jsonEventsArray) {
esDriver.createBulkProcessor();
BulkProcessor bp = esDriver.getBulkProcessor();
GetResult result = null;
String index = mEngine.getConfig().getProperty(MudrodConstants.ES_INDEX_NAME);
String eventType = "eonet_event";
//for each event
for (JsonElement jsonElement : jsonEventsArray) {
UpdateRequest updateRequest = null;
JsonObject event = jsonElement.getAsJsonObject();
String eventId = event.get("id").toString();
try {
IndexRequest indexRequest = new IndexRequest(
index, eventType, eventId).source(executeEventMapping(event));
updateRequest =
new UpdateRequest(index, eventType, eventId).upsert(indexRequest);
updateRequest.doc(indexRequest);
bp.add(updateRequest);
} catch (NumberFormatException e) {
LOG.error("Error whilst processing numbers", e);
}
UpdateResponse updateResponse = null;
try {
updateResponse = esDriver.getClient().update(updateRequest).get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to execute bulk Index request : ", e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
if (updateResponse != null) {
result = updateResponse.getGetResult();
}
}
esDriver.destroyBulkProcessor();
//return result.getSource().toString();
return "";
}