in gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java [53:173]
public FutureCallbackHolder(final @Nullable WriteCallback callback,
ExceptionLogger exceptionLogger,
final MalformedDocPolicy malformedDocPolicy) {
this.future = new Future<WriteResponse>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return done.get();
}
@Override
public WriteResponse get()
throws InterruptedException, ExecutionException {
Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.take();
return getWriteResponseorThrow(writeResponseThrowablePair);
}
@Override
public WriteResponse get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.poll(timeout, unit);
if (writeResponseThrowablePair == null) {
throw new TimeoutException("Timeout exceeded while waiting for future to be done");
} else {
return getWriteResponseorThrow(writeResponseThrowablePair);
}
}
};
this.actionListener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (bulkItemResponses.hasFailures()) {
boolean logicalErrors = false;
boolean serverErrors = false;
for (BulkItemResponse bulkItemResponse: bulkItemResponses) {
if (bulkItemResponse.isFailed()) {
// check if the failure is permanent (logical) or transient (server)
if (isLogicalError(bulkItemResponse)) {
// check error policy
switch (malformedDocPolicy) {
case IGNORE: {
log.debug("Document id {} was malformed with error {}",
bulkItemResponse.getId(),
bulkItemResponse.getFailureMessage());
break;
}
case WARN: {
log.warn("Document id {} was malformed with error {}",
bulkItemResponse.getId(),
bulkItemResponse.getFailureMessage());
break;
}
default: {
// Pass through
}
}
logicalErrors = true;
} else {
serverErrors = true;
}
}
}
if (serverErrors) {
onFailure(new RuntimeException("Partial failures in the batch: " + bulkItemResponses.buildFailureMessage()));
} else if (logicalErrors) {
// all errors found were logical, throw RuntimeException if policy says to Fail
switch (malformedDocPolicy) {
case FAIL: {
onFailure(new RuntimeException("Partial non-recoverable failures in the batch. To ignore these, set "
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY + " to "
+ MalformedDocPolicy.IGNORE.name()));
break;
}
default: {
WriteResponse writeResponse = new GenericWriteResponse<BulkResponse>(bulkItemResponses);
writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null));
if (callback != null) {
callback.onSuccess(writeResponse);
}
}
}
}
} else {
WriteResponse writeResponse = new GenericWriteResponse<BulkResponse>(bulkItemResponses);
writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null));
if (callback != null) {
callback.onSuccess(writeResponse);
}
}
}
private boolean isLogicalError(BulkItemResponse bulkItemResponse) {
String failureMessage = bulkItemResponse.getFailureMessage();
return failureMessage.contains("IllegalArgumentException")
|| failureMessage.contains("illegal_argument_exception")
|| failureMessage.contains("MapperParsingException")
|| failureMessage.contains("mapper_parsing_exception");
}
@Override
public void onFailure(Exception exception) {
writeResponseQueue.add(new Pair<WriteResponse, Throwable>(null, exception));
if (exceptionLogger != null) {
exceptionLogger.log(exception);
}
if (callback != null) {
callback.onFailure(exception);
}
}
};
}