in mr/src/main/java/org/elasticsearch/hadoop/rest/bulk/BulkProcessor.java [181:422]
public BulkResponse tryFlush() {
BulkResponse bulkResult = null;
boolean trackingArrayExpanded = false;
String bulkLoggingID = createDebugTxnID();
try {
// double check data - it might be a false flush (called on clean-up)
if (data.length() > 0) {
int totalDocs = data.entries();
int docsSent = 0;
int docsSkipped = 0;
int docsAborted = 0;
long totalTime = 0L;
boolean retryOperation = false;
int totalAttempts = 0;
long waitTime = 0L;
List<BulkAttempt> retries = new ArrayList<BulkAttempt>();
List<BulkResponse.BulkError> abortErrors = new ArrayList<BulkResponse.BulkError>();
do {
// Throw to break out of a possible infinite loop, but only if the limit is a positive number
if (retryLimit >= 0 && totalAttempts > retryLimit) {
throw new EsHadoopException("Executed too many bulk requests without success. Attempted [" +
totalAttempts + "] write operations, which exceeds the bulk request retry limit specified" +
"by [" + ConfigurationOptions.ES_BATCH_WRITE_RETRY_LIMIT + "], and found data still " +
"not accepted. Perhaps there is an error handler that is not terminating? Bailing out..."
);
}
// Log messages, and if wait time is set, perform the thread sleep.
initFlushOperation(bulkLoggingID, retryOperation, retries.size(), waitTime);
// Exec bulk operation to ES, get response.
debugLog(bulkLoggingID, "Submitting request");
RestClient.BulkActionResponse bar = restClient.bulk(resource, data);
debugLog(bulkLoggingID, "Response received");
totalAttempts++;
totalTime += bar.getTimeSpent();
// Log retry stats if relevant
if (retryOperation) {
stats.docsRetried += data.entries();
stats.bytesRetried += data.length();
stats.bulkRetries++;
stats.bulkRetriesTotalTime += bar.getTimeSpent();
}
executedBulkWrite = true;
// Handle bulk write failures
if (!bar.getEntries().hasNext()) {
// Legacy Case:
// If no items on response, assume all documents made it in.
// Recorded bytes are ack'd here
stats.bytesAccepted += data.length();
stats.docsAccepted += data.entries();
retryOperation = false;
bulkResult = BulkResponse.complete(bar.getResponseCode(), totalTime, totalDocs, totalDocs, 0);
} else {
// Base Case:
// Iterate over the response and the data in the tracking bytes array at the same time, passing
// errors to error handlers for resolution.
// Keep track of which document we are on as well as where we are in the tracking bytes array.
int documentNumber = 0;
int trackingBytesPosition = 0;
// Hand off the previous list of retries so that we can track the next set of retries (if any).
List<BulkAttempt> previousRetries = retries;
retries = new ArrayList<BulkAttempt>();
// If a document is edited and retried then it is added at the end of the buffer. Keep a tail list of these new retry attempts.
List<BulkAttempt> newDocumentRetries = new ArrayList<BulkAttempt>();
BulkWriteErrorCollector errorCollector = new BulkWriteErrorCollector();
// Iterate over all entries, and for each error found, attempt to handle the problem.
for (Iterator<Map> iterator = bar.getEntries(); iterator.hasNext(); ) {
// The array of maps are (operation -> document info) maps
Map map = iterator.next();
// Get the underlying document information as a map and extract the error information.
Map values = (Map) map.values().iterator().next();
Integer docStatus = (Integer) values.get("status");
EsHadoopException error = errorExtractor.extractError(values);
if (error == null){
// Write operation for this entry succeeded
stats.bytesAccepted += data.length(trackingBytesPosition);
stats.docsAccepted += 1;
docsSent += 1;
data.remove(trackingBytesPosition);
} else {
// Found a failed write
BytesArray document = data.entry(trackingBytesPosition);
// In pre-2.x ES versions, the status is not included.
int status = docStatus == null ? -1 : docStatus;
// Figure out which attempt number sending this document was and which position the doc was in
BulkAttempt previousAttempt;
if (previousRetries.isEmpty()) {
// No previous retries, create an attempt for the first run
previousAttempt = new BulkAttempt(1, documentNumber);
} else {
// Grab the previous attempt for the document we're processing, and bump the attempt number.
previousAttempt = previousRetries.get(documentNumber);
previousAttempt.attemptNumber++;
}
// Handle bulk write failures
// Todo: We should really do more with these bulk error pass reasons if the final outcome is an ABORT.
List<String> bulkErrorPassReasons = new ArrayList<String>();
BulkWriteFailure failure = new BulkWriteFailure(
status,
error,
document,
previousAttempt.attemptNumber,
bulkErrorPassReasons
);
// Label the loop since we'll be breaking to/from it within a switch block.
handlerLoop: for (IBulkWriteErrorHandler errorHandler : documentBulkErrorHandlers) {
HandlerResult result;
try {
result = errorHandler.onError(failure, errorCollector);
} catch (EsHadoopAbortHandlerException ahe) {
// Count this as an abort operation, but capture the error message from the
// exception as the reason. Log any cause since it will be swallowed.
Throwable cause = ahe.getCause();
if (cause != null) {
LOG.error("Bulk write error handler abort exception caught with underlying cause:", cause);
}
result = HandlerResult.ABORT;
error = ahe;
} catch (Exception e) {
throw new EsHadoopException("Encountered exception during error handler.", e);
}
switch (result) {
case HANDLED:
Assert.isTrue(errorCollector.getAndClearMessage() == null,
"Found pass message with Handled response. Be sure to return the value " +
"returned from pass(String) call.");
// Check for document retries
if (errorCollector.receivedRetries()) {
byte[] retryDataBuffer = errorCollector.getAndClearRetryValue();
if (retryDataBuffer == null || document.bytes() == retryDataBuffer) {
// Retry the same data.
// Continue to track the previous attempts.
retries.add(previousAttempt);
trackingBytesPosition++;
} else {
// Check document contents to see if it was deserialized and reserialized.
if (ArrayUtils.sliceEquals(document.bytes(), document.offset(), document.length(), retryDataBuffer, 0, retryDataBuffer.length)) {
// Same document content. Leave the data as is in tracking buffer,
// and continue tracking previous attempts.
retries.add(previousAttempt);
trackingBytesPosition++;
} else {
// Document has changed.
// Track new attempts.
BytesRef newEntry = validateEditedEntry(retryDataBuffer);
data.remove(trackingBytesPosition);
data.copyFrom(newEntry);
// Determine if our tracking bytes array is going to expand.
if (ba.available() < newEntry.length()) {
trackingArrayExpanded = true;
}
previousAttempt.attemptNumber = 0;
newDocumentRetries.add(previousAttempt);
}
}
} else {
// Handled but not retried means we won't have sent that document.
data.remove(trackingBytesPosition);
docsSkipped += 1;
}
break handlerLoop;
case PASS:
String reason = errorCollector.getAndClearMessage();
if (reason != null) {
bulkErrorPassReasons.add(reason);
}
continue handlerLoop;
case ABORT:
errorCollector.getAndClearMessage(); // Sanity clearing
data.remove(trackingBytesPosition);
docsAborted += 1;
abortErrors.add(new BulkResponse.BulkError(previousAttempt.originalPosition, document, status, error));
break handlerLoop;
}
}
}
documentNumber++;
}
// Place any new documents that have been added at the end of the data buffer at the end of the retry list.
retries.addAll(newDocumentRetries);
if (!retries.isEmpty()) {
retryOperation = true;
waitTime = errorCollector.getDelayTimeBetweenRetries();
} else {
retryOperation = false;
if (docsAborted > 0) {
bulkResult = BulkResponse.partial(bar.getResponseCode(), totalTime, totalDocs, docsSent, docsSkipped, docsAborted, abortErrors);
} else {
bulkResult = BulkResponse.complete(bar.getResponseCode(), totalTime, totalDocs, docsSent, docsSkipped);
}
}
}
} while (retryOperation);
debugLog(bulkLoggingID, "Completed. [%d] Original Entries. [%d] Attempts. [%d/%d] Docs Sent. [%d/%d] Docs Skipped. [%d/%d] Docs Aborted.",
totalDocs,
totalAttempts,
docsSent, totalDocs,
docsSkipped, totalDocs,
docsAborted, totalDocs
);
} else {
bulkResult = BulkResponse.complete();
}
} catch (EsHadoopException ex) {
debugLog(bulkLoggingID, "Failed. %s", ex.getMessage());
hadWriteErrors = true;
throw ex;
}
// always discard data since there's no code path that uses the in flight data
// during retry operations, the tracking bytes array may grow. In that case, do a hard reset.
// TODO: Perhaps open an issue to limit the expansion of a single byte array (for repeated rewrite-retries)
if (trackingArrayExpanded) {
ba = new BytesArray(new byte[settings.getBatchSizeInBytes()], 0);
data = new TrackingBytesArray(ba);
} else {
data.reset();
dataEntries = 0;
}
return bulkResult;
}