private AsyncHandler getHandler()

in src/main/java/com/amazonaws/services/dynamodbv2/streams/connectors/DynamoDBReplicationEmitter.java [314:377]


    private AsyncHandler<? extends AmazonWebServiceRequest, ?> getHandler(final BlockingQueue<Record> toSubmit, final List<Record> failedRecords,
                                                                          final AtomicInteger retryCount, final CountDownLatch doneSignal, final Record record) {
        return new AsyncHandler<AmazonWebServiceRequest, Object>() {
            @Override
            public void onError(Exception exception) {
                if (isRetryable(exception)) { // Throttling or 500 response
                    retryCount.incrementAndGet();
                    // Retryable
                    while (!toSubmit.offer(record)) {
                        ; // NOP
                    }
                } else if (exception instanceof ItemCollectionSizeLimitExceededException) {
                    // Not Retryable, but from DynamoDB
                    log.error("Local Secondary Index is full: " + record, exception);
                    if (skipErrors) {
                        failedRecords.add(record);
                        doneSignal.countDown();
                    } else {
                        System.exit(StatusCodes.EIO);
                    }
                } else if (exception instanceof AmazonServiceException && 413 == ((AmazonServiceException) exception).getStatusCode()) {
                    log.error("Request entity too large: " + record, exception);
                    if (skipErrors) {
                        failedRecords.add(record);
                        doneSignal.countDown();
                    } else {
                        System.exit(StatusCodes.EIO);
                    }
                } else if (exception instanceof AmazonClientException) {
                    // This block catches unrecoverable AmazonWebServices errors:
                    //
                    // ConditionalCheckFailedException - not possible as we are not making conditional writes
                    // LimitExceededException - not possible for PutItem, UpdateItem, or DeleteItem
                    // ResourceInUseException - not possible for PutItem, UpdateItem, or DeleteItem
                    // ResourceNotFoundException - table does not exist
                    // AmazonServiceException - any unhandled response from the DynamoDB service
                    // AmazonClientException - any other 400 response: validation, authentication, authorization, or configuration exception
                    //
                    log.fatal("Exception emitting record: " + record, exception);
                    System.exit(StatusCodes.EIO);
                } else {
                    // This block catches all other exceptions. Since it was not expected, this is an unrecoverable exception.
                    log.fatal("Abnormal exception emitting record: " + record, exception);
                    System.exit(StatusCodes.EIO);
                }
            }

            private boolean isRetryable(Exception exception) {
                if (exception instanceof ProvisionedThroughputExceededException) {
                    return true;
                } else if (exception instanceof InternalServerErrorException) {
                    return true;
                } else {
                    return false;
                }
            }

            @Override
            public void onSuccess(AmazonWebServiceRequest request, Object result) {
                log.trace("Record emitted successfully: " + record.getDynamodb().getSequenceNumber());
                doneSignal.countDown();
            }
        };
    }