in src/main/java/com/amazonaws/services/dynamodbv2/streams/connectors/DynamoDBReplicationEmitter.java [314:377]
private AsyncHandler<? extends AmazonWebServiceRequest, ?> getHandler(final BlockingQueue<Record> toSubmit, final List<Record> failedRecords,
final AtomicInteger retryCount, final CountDownLatch doneSignal, final Record record) {
return new AsyncHandler<AmazonWebServiceRequest, Object>() {
@Override
public void onError(Exception exception) {
if (isRetryable(exception)) { // Throttling or 500 response
retryCount.incrementAndGet();
// Retryable
while (!toSubmit.offer(record)) {
; // NOP
}
} else if (exception instanceof ItemCollectionSizeLimitExceededException) {
// Not Retryable, but from DynamoDB
log.error("Local Secondary Index is full: " + record, exception);
if (skipErrors) {
failedRecords.add(record);
doneSignal.countDown();
} else {
System.exit(StatusCodes.EIO);
}
} else if (exception instanceof AmazonServiceException && 413 == ((AmazonServiceException) exception).getStatusCode()) {
log.error("Request entity too large: " + record, exception);
if (skipErrors) {
failedRecords.add(record);
doneSignal.countDown();
} else {
System.exit(StatusCodes.EIO);
}
} else if (exception instanceof AmazonClientException) {
// This block catches unrecoverable AmazonWebServices errors:
//
// ConditionalCheckFailedException - not possible as we are not making conditional writes
// LimitExceededException - not possible for PutItem, UpdateItem, or DeleteItem
// ResourceInUseException - not possible for PutItem, UpdateItem, or DeleteItem
// ResourceNotFoundException - table does not exist
// AmazonServiceException - any unhandled response from the DynamoDB service
// AmazonClientException - any other 400 response: validation, authentication, authorization, or configuration exception
//
log.fatal("Exception emitting record: " + record, exception);
System.exit(StatusCodes.EIO);
} else {
// This block catches all other exceptions. Since it was not expected, this is an unrecoverable exception.
log.fatal("Abnormal exception emitting record: " + record, exception);
System.exit(StatusCodes.EIO);
}
}
private boolean isRetryable(Exception exception) {
if (exception instanceof ProvisionedThroughputExceededException) {
return true;
} else if (exception instanceof InternalServerErrorException) {
return true;
} else {
return false;
}
}
@Override
public void onSuccess(AmazonWebServiceRequest request, Object result) {
log.trace("Record emitted successfully: " + record.getDynamodb().getSequenceNumber());
doneSignal.countDown();
}
};
}