in mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/BulkEntryWriter.java [59:148]
public BytesRef writeBulkEntry(Object object) {
Object toRead = object;
BytesRef writeResult = null;
boolean retryWrite = false;
boolean skip = false;
int attempts = 0;
do {
try {
retryWrite = false;
writeResult = bulkCommand.write(toRead);
} catch (Exception serializationException) {
// Create error event
List<String> passReasons = new ArrayList<String>();
SerializationFailure entry = new SerializationFailure(serializationException, object, passReasons);
// Set up error collector
SerdeErrorCollector<Object> errorCollector = new SerdeErrorCollector<Object>();
// Attempt failure handling
Exception abortException = serializationException;
handlerloop:
for (ISerializationErrorHandler serializationErrorHandler : serializationErrorHandlers) {
HandlerResult result;
try {
result = serializationErrorHandler.onError(entry, errorCollector);
} catch (EsHadoopAbortHandlerException ahe) {
// Count this as an abort operation. Wrap cause in a serialization exception.
result = HandlerResult.ABORT;
abortException = new EsHadoopSerializationException(ahe.getMessage(), ahe.getCause());
} catch (Exception e) {
LOG.error("Could not handle serialization error event due to an exception in error handler. " +
"Serialization exception:", serializationException);
throw new EsHadoopException("Encountered unexpected exception during error handler execution.", e);
}
switch (result) {
case HANDLED:
Assert.isTrue(errorCollector.getAndClearMessage() == null,
"Found pass message with Handled response. Be sure to return the value returned from " +
"the pass(String) call.");
// Check for retries
if (errorCollector.receivedRetries()) {
Object retryObject = errorCollector.getAndClearRetryValue();
if (retryObject != null) {
// Use new retry object to read
toRead = retryObject;
}
// If null, retry same object.
// Limit the number of retries though to like 50
if (attempts >= 50) {
throw new EsHadoopException("Maximum retry attempts (50) reached for serialization errors.");
} else {
retryWrite = true;
attempts++;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping a record that resulted in error while reading: [" +
object.toString() + "]");
} else {
LOG.info("Skipping a record that resulted in error while reading. (DEBUG for more info).");
}
skip = true;
}
break handlerloop;
case PASS:
String reason = errorCollector.getAndClearMessage();
if (reason != null) {
passReasons.add(reason);
}
continue handlerloop;
case ABORT:
errorCollector.getAndClearMessage(); // Sanity clearing
if (abortException instanceof EsHadoopSerializationException) {
throw (EsHadoopSerializationException) abortException;
} else {
throw new EsHadoopSerializationException(abortException);
}
}
}
}
} while (retryWrite);
if (writeResult == null && skip == false) {
throw new EsHadoopSerializationException("Could not write record to bulk request.");
}
return writeResult;
}