public BytesRef writeBulkEntry()

in mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/BulkEntryWriter.java [59:148]


    public BytesRef writeBulkEntry(Object object) {
        Object toRead = object;
        BytesRef writeResult = null;
        boolean retryWrite = false;
        boolean skip = false;
        int attempts = 0;
        do {
            try {
                retryWrite = false;
                writeResult = bulkCommand.write(toRead);
            } catch (Exception serializationException) {
                // Create error event
                List<String> passReasons = new ArrayList<String>();
                SerializationFailure entry = new SerializationFailure(serializationException, object, passReasons);

                // Set up error collector
                SerdeErrorCollector<Object> errorCollector = new SerdeErrorCollector<Object>();

                // Attempt failure handling
                Exception abortException = serializationException;
                handlerloop:
                for (ISerializationErrorHandler serializationErrorHandler : serializationErrorHandlers) {
                    HandlerResult result;
                    try {
                        result = serializationErrorHandler.onError(entry, errorCollector);
                    } catch (EsHadoopAbortHandlerException ahe) {
                        // Count this as an abort operation. Wrap cause in a serialization exception.
                        result = HandlerResult.ABORT;
                        abortException = new EsHadoopSerializationException(ahe.getMessage(), ahe.getCause());
                    } catch (Exception e) {
                        LOG.error("Could not handle serialization error event due to an exception in error handler. " +
                                "Serialization exception:", serializationException);
                        throw new EsHadoopException("Encountered unexpected exception during error handler execution.", e);
                    }

                    switch (result) {
                        case HANDLED:
                            Assert.isTrue(errorCollector.getAndClearMessage() == null,
                                    "Found pass message with Handled response. Be sure to return the value returned from " +
                                            "the pass(String) call.");
                            // Check for retries
                            if (errorCollector.receivedRetries()) {
                                Object retryObject = errorCollector.getAndClearRetryValue();
                                if (retryObject != null) {
                                    // Use new retry object to read
                                    toRead = retryObject;
                                }
                                // If null, retry same object.

                                // Limit the number of retries though to like 50
                                if (attempts >= 50) {
                                    throw new EsHadoopException("Maximum retry attempts (50) reached for serialization errors.");
                                } else {
                                    retryWrite = true;
                                    attempts++;
                                }
                            } else {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Skipping a record that resulted in error while reading: [" +
                                            object.toString() + "]");
                                } else {
                                    LOG.info("Skipping a record that resulted in error while reading. (DEBUG for more info).");
                                }
                                skip = true;
                            }
                            break handlerloop;
                        case PASS:
                            String reason = errorCollector.getAndClearMessage();
                            if (reason != null) {
                                passReasons.add(reason);
                            }
                            continue handlerloop;
                        case ABORT:
                            errorCollector.getAndClearMessage(); // Sanity clearing
                            if (abortException instanceof EsHadoopSerializationException) {
                                throw (EsHadoopSerializationException) abortException;
                            } else {
                                throw new EsHadoopSerializationException(abortException);
                            }
                    }
                }
            }
        } while (retryWrite);

        if (writeResult == null && skip == false) {
            throw new EsHadoopSerializationException("Could not write record to bulk request.");
        }

        return writeResult;
    }