public BulkResponse tryFlush()

in mr/src/main/java/org/elasticsearch/hadoop/rest/bulk/BulkProcessor.java [181:422]


    public BulkResponse tryFlush() {
        BulkResponse bulkResult = null;
        boolean trackingArrayExpanded = false;
        String bulkLoggingID = createDebugTxnID();

        try {
            // double check data - it might be a false flush (called on clean-up)
            if (data.length() > 0) {
                int totalDocs = data.entries();
                int docsSent = 0;
                int docsSkipped = 0;
                int docsAborted = 0;
                long totalTime = 0L;
                boolean retryOperation = false;
                int totalAttempts = 0;
                long waitTime = 0L;
                List<BulkAttempt> retries = new ArrayList<BulkAttempt>();
                List<BulkResponse.BulkError> abortErrors = new ArrayList<BulkResponse.BulkError>();

                do {
                    // Throw to break out of a possible infinite loop, but only if the limit is a positive number
                    if (retryLimit >= 0 && totalAttempts > retryLimit) {
                        throw new EsHadoopException("Executed too many bulk requests without success. Attempted [" +
                                totalAttempts + "] write operations, which exceeds the bulk request retry limit specified" +
                                "by [" + ConfigurationOptions.ES_BATCH_WRITE_RETRY_LIMIT + "], and found data still " +
                                "not accepted. Perhaps there is an error handler that is not terminating? Bailing out..."
                        );
                    }

                    // Log messages, and if wait time is set, perform the thread sleep.
                    initFlushOperation(bulkLoggingID, retryOperation, retries.size(), waitTime);

                    // Exec bulk operation to ES, get response.
                    debugLog(bulkLoggingID, "Submitting request");
                    RestClient.BulkActionResponse bar = restClient.bulk(resource, data);
                    debugLog(bulkLoggingID, "Response received");
                    totalAttempts++;
                    totalTime += bar.getTimeSpent();

                    // Log retry stats if relevant
                    if (retryOperation) {
                        stats.docsRetried += data.entries();
                        stats.bytesRetried += data.length();
                        stats.bulkRetries++;
                        stats.bulkRetriesTotalTime += bar.getTimeSpent();
                    }
                    executedBulkWrite = true;

                    // Handle bulk write failures
                    if (!bar.getEntries().hasNext()) {
                        // Legacy Case:
                        // If no items on response, assume all documents made it in.
                        // Recorded bytes are ack'd here
                        stats.bytesAccepted += data.length();
                        stats.docsAccepted += data.entries();
                        retryOperation = false;
                        bulkResult = BulkResponse.complete(bar.getResponseCode(), totalTime, totalDocs, totalDocs, 0);
                    } else {
                        // Base Case:
                        // Iterate over the response and the data in the tracking bytes array at the same time, passing
                        // errors to error handlers for resolution.

                        // Keep track of which document we are on as well as where we are in the tracking bytes array.
                        int documentNumber = 0;
                        int trackingBytesPosition = 0;

                        // Hand off the previous list of retries so that we can track the next set of retries (if any).
                        List<BulkAttempt> previousRetries = retries;
                        retries = new ArrayList<BulkAttempt>();

                        // If a document is edited and retried then it is added at the end of the buffer. Keep a tail list of these new retry attempts.
                        List<BulkAttempt> newDocumentRetries = new ArrayList<BulkAttempt>();

                        BulkWriteErrorCollector errorCollector = new BulkWriteErrorCollector();

                        // Iterate over all entries, and for each error found, attempt to handle the problem.
                        for (Iterator<Map> iterator = bar.getEntries(); iterator.hasNext(); ) {

                            // The array of maps are (operation -> document info) maps
                            Map map = iterator.next();
                            // Get the underlying document information as a map and extract the error information.
                            Map values = (Map) map.values().iterator().next();
                            Integer docStatus = (Integer) values.get("status");
                            EsHadoopException error = errorExtractor.extractError(values);

                            if (error == null){
                                // Write operation for this entry succeeded
                                stats.bytesAccepted += data.length(trackingBytesPosition);
                                stats.docsAccepted += 1;
                                docsSent += 1;
                                data.remove(trackingBytesPosition);
                            } else {
                                // Found a failed write
                                BytesArray document = data.entry(trackingBytesPosition);

                                // In pre-2.x ES versions, the status is not included.
                                int status = docStatus == null ? -1 : docStatus;

                                // Figure out which attempt number sending this document was and which position the doc was in
                                BulkAttempt previousAttempt;
                                if (previousRetries.isEmpty()) {
                                    // No previous retries, create an attempt for the first run
                                    previousAttempt = new BulkAttempt(1, documentNumber);
                                } else {
                                    // Grab the previous attempt for the document we're processing, and bump the attempt number.
                                    previousAttempt = previousRetries.get(documentNumber);
                                    previousAttempt.attemptNumber++;
                                }

                                // Handle bulk write failures
                                // Todo: We should really do more with these bulk error pass reasons if the final outcome is an ABORT.
                                List<String> bulkErrorPassReasons = new ArrayList<String>();
                                BulkWriteFailure failure = new BulkWriteFailure(
                                        status,
                                        error,
                                        document,
                                        previousAttempt.attemptNumber,
                                        bulkErrorPassReasons
                                );

                                // Label the loop since we'll be breaking to/from it within a switch block.
                                handlerLoop: for (IBulkWriteErrorHandler errorHandler : documentBulkErrorHandlers) {
                                    HandlerResult result;
                                    try {
                                        result = errorHandler.onError(failure, errorCollector);
                                    } catch (EsHadoopAbortHandlerException ahe) {
                                        // Count this as an abort operation, but capture the error message from the
                                        // exception as the reason. Log any cause since it will be swallowed.
                                        Throwable cause = ahe.getCause();
                                        if (cause != null) {
                                            LOG.error("Bulk write error handler abort exception caught with underlying cause:", cause);
                                        }
                                        result = HandlerResult.ABORT;
                                        error = ahe;
                                    } catch (Exception e) {
                                        throw new EsHadoopException("Encountered exception during error handler.", e);
                                    }

                                    switch (result) {
                                        case HANDLED:
                                            Assert.isTrue(errorCollector.getAndClearMessage() == null,
                                                    "Found pass message with Handled response. Be sure to return the value " +
                                                            "returned from pass(String) call.");
                                            // Check for document retries
                                            if (errorCollector.receivedRetries()) {
                                                byte[] retryDataBuffer = errorCollector.getAndClearRetryValue();
                                                if (retryDataBuffer == null || document.bytes() == retryDataBuffer) {
                                                    // Retry the same data.
                                                    // Continue to track the previous attempts.
                                                    retries.add(previousAttempt);
                                                    trackingBytesPosition++;
                                                } else {
                                                    // Check document contents to see if it was deserialized and reserialized.
                                                    if (ArrayUtils.sliceEquals(document.bytes(), document.offset(), document.length(), retryDataBuffer, 0, retryDataBuffer.length)) {
                                                        // Same document content. Leave the data as is in tracking buffer,
                                                        // and continue tracking previous attempts.
                                                        retries.add(previousAttempt);
                                                        trackingBytesPosition++;
                                                    } else {
                                                        // Document has changed.
                                                        // Track new attempts.
                                                        BytesRef newEntry = validateEditedEntry(retryDataBuffer);
                                                        data.remove(trackingBytesPosition);
                                                        data.copyFrom(newEntry);
                                                        // Determine if our tracking bytes array is going to expand.
                                                        if (ba.available() < newEntry.length()) {
                                                            trackingArrayExpanded = true;
                                                        }
                                                        previousAttempt.attemptNumber = 0;
                                                        newDocumentRetries.add(previousAttempt);
                                                    }
                                                }
                                            } else {
                                                // Handled but not retried means we won't have sent that document.
                                                data.remove(trackingBytesPosition);
                                                docsSkipped += 1;
                                            }
                                            break handlerLoop;
                                        case PASS:
                                            String reason = errorCollector.getAndClearMessage();
                                            if (reason != null) {
                                                bulkErrorPassReasons.add(reason);
                                            }
                                            continue handlerLoop;
                                        case ABORT:
                                            errorCollector.getAndClearMessage(); // Sanity clearing
                                            data.remove(trackingBytesPosition);
                                            docsAborted += 1;
                                            abortErrors.add(new BulkResponse.BulkError(previousAttempt.originalPosition, document, status, error));
                                            break handlerLoop;
                                    }
                                }
                            }
                            documentNumber++;
                        }

                        // Place any new documents that have been added at the end of the data buffer at the end of the retry list.
                        retries.addAll(newDocumentRetries);

                        if (!retries.isEmpty()) {
                            retryOperation = true;
                            waitTime = errorCollector.getDelayTimeBetweenRetries();
                        } else {
                            retryOperation = false;
                            if (docsAborted > 0) {
                                bulkResult = BulkResponse.partial(bar.getResponseCode(), totalTime, totalDocs, docsSent, docsSkipped, docsAborted, abortErrors);
                            } else {
                                bulkResult = BulkResponse.complete(bar.getResponseCode(), totalTime, totalDocs, docsSent, docsSkipped);
                            }
                        }
                    }
                } while (retryOperation);

                debugLog(bulkLoggingID, "Completed. [%d] Original Entries. [%d] Attempts. [%d/%d] Docs Sent. [%d/%d] Docs Skipped. [%d/%d] Docs Aborted.",
                        totalDocs,
                        totalAttempts,
                        docsSent, totalDocs,
                        docsSkipped, totalDocs,
                        docsAborted, totalDocs
                );
            } else {
                bulkResult = BulkResponse.complete();
            }
        } catch (EsHadoopException ex) {
            debugLog(bulkLoggingID, "Failed. %s", ex.getMessage());
            hadWriteErrors = true;
            throw ex;
        }

        // always discard data since there's no code path that uses the in flight data
        // during retry operations, the tracking bytes array may grow. In that case, do a hard reset.
        // TODO: Perhaps open an issue to limit the expansion of a single byte array (for repeated rewrite-retries)
        if (trackingArrayExpanded) {
            ba = new BytesArray(new byte[settings.getBatchSizeInBytes()], 0);
            data = new TrackingBytesArray(ba);
        } else {
            data.reset();
            dataEntries = 0;
        }

        return bulkResult;
    }