in persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java [582:662]
public BulkProcessor getBulkProcessor() {
if (bulkProcessor != null) {
return bulkProcessor;
}
BulkProcessor.Listener bulkProcessorListener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
LOGGER.debug("Before Bulk");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
LOGGER.debug("After Bulk");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
LOGGER.error("After Bulk (failure)", failure);
}
};
BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
bulkProcessorListener);
if (bulkProcessorConcurrentRequests != null) {
int concurrentRequests = Integer.parseInt(bulkProcessorConcurrentRequests);
if (concurrentRequests > 1) {
bulkProcessorBuilder.setConcurrentRequests(concurrentRequests);
}
}
if (bulkProcessorBulkActions != null) {
int bulkActions = Integer.parseInt(bulkProcessorBulkActions);
bulkProcessorBuilder.setBulkActions(bulkActions);
}
if (bulkProcessorBulkSize != null) {
bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkProcessorBulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), BULK_PROCESSOR_BULK_SIZE));
}
if (bulkProcessorFlushInterval != null) {
bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(bulkProcessorFlushInterval, null, BULK_PROCESSOR_FLUSH_INTERVAL));
} else {
// in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default
bulkProcessorBuilder.setFlushInterval(new TimeValue(5, TimeUnit.SECONDS));
}
if (bulkProcessorBackoffPolicy != null) {
String backoffPolicyStr = bulkProcessorBackoffPolicy;
if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) {
backoffPolicyStr = backoffPolicyStr.toLowerCase();
if ("nobackoff".equals(backoffPolicyStr)) {
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.noBackoff());
} else if (backoffPolicyStr.startsWith("constant(")) {
int paramStartPos = backoffPolicyStr.indexOf("constant(" + "constant(".length());
int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries));
} else if (backoffPolicyStr.startsWith("exponential")) {
if (!backoffPolicyStr.contains("(")) {
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff());
} else {
// we detected parameters, must process them.
int paramStartPos = backoffPolicyStr.indexOf("exponential(" + "exponential(".length());
int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries));
}
}
}
}
bulkProcessor = bulkProcessorBuilder.build();
return bulkProcessor;
}