in src/main/java/com/amazonaws/services/dynamodbv2/streams/connectors/DynamoDBReplicationEmitter.java [246:312]
public List<Record> emit(final UnmodifiableBuffer<Record> buffer) {
if (isShutdown) {
if (buffer.getRecords().isEmpty()) {
// This is OK, but not expected
log.warn("Record processor called emit after calling shutdown. Continuing becuase buffer is empty.");
return Collections.emptyList();
} else {
throw new IllegalStateException("Cannot emit records after emitter has been shutdown.");
}
}
// Asynchronously process all writes, but block on the results.
List<Record> records = buffer.getRecords();
// Stores records that failed with a non-retryable exception
final List<Record> failedRecords = Collections.synchronizedList(new ArrayList<Record>());
// Queue of records to submit
final BlockingQueue<Record> toSubmit = new LinkedBlockingQueue<Record>(records);
// Used to detect when all requests have either succeeded or resulted in a non-retryable exception
final CountDownLatch doneSignal = new CountDownLatch(records.size());
final AtomicInteger retryCount = new AtomicInteger();
boolean interrupted = false;
try {
while (doneSignal.getCount() > 0) {
Record recordToSubmit = null;
try {
recordToSubmit = toSubmit.poll(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted = true;
}
final Record record = recordToSubmit;
if (null == record) {
continue; // Check if all records have completed and if not try to poll again
}
// Generate the request based on the record
AmazonWebServiceRequest request = createRequest(record);
if (request == null) { // Should only happen if DynamoDB Streams API updates to support different operations
// than {INSERT, MODIFY, REMOVE}.
continue;
}
// Submit the write request based on its type
if (request instanceof PutItemRequest) { // PUT
getDynamodb().putItemAsync((PutItemRequest) request,
(AsyncHandler<PutItemRequest, PutItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
} else if (request instanceof DeleteItemRequest) { // DELETE
getDynamodb().deleteItemAsync((DeleteItemRequest) request,
(AsyncHandler<DeleteItemRequest, DeleteItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
} else if (request instanceof UpdateItemRequest) { // UPDATE
getDynamodb().updateItemAsync((UpdateItemRequest) request,
(AsyncHandler<UpdateItemRequest, UpdateItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
} else { // Should only happen if DynamoDB allows a new operation other than {PutItem, DeleteItem,
// UpdateItem} for single item writes.
log.warn("Unsupported DynamoDB request: " + request);
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
emitCloudWatchMetrics(records, failedRecords, retryCount);
if (!records.isEmpty()) {
log.debug("Successfully emitted " + (records.size() - failedRecords.size()) + " records ending with sequence number "
+ buffer.getLastSequenceNumber());
} else {
log.debug("No records to emit");
}
return failedRecords;
}