public List emit()

in src/main/java/com/amazonaws/services/dynamodbv2/streams/connectors/DynamoDBReplicationEmitter.java [246:312]


    public List<Record> emit(final UnmodifiableBuffer<Record> buffer) {
        if (isShutdown) {
            if (buffer.getRecords().isEmpty()) {
                // This is OK, but not expected
                log.warn("Record processor called emit after calling shutdown. Continuing becuase buffer is empty.");
                return Collections.emptyList();
            } else {
                throw new IllegalStateException("Cannot emit records after emitter has been shutdown.");
            }
        }
        // Asynchronously process all writes, but block on the results.
        List<Record> records = buffer.getRecords();
        // Stores records that failed with a non-retryable exception
        final List<Record> failedRecords = Collections.synchronizedList(new ArrayList<Record>());
        // Queue of records to submit
        final BlockingQueue<Record> toSubmit = new LinkedBlockingQueue<Record>(records);
        // Used to detect when all requests have either succeeded or resulted in a non-retryable exception
        final CountDownLatch doneSignal = new CountDownLatch(records.size());
        final AtomicInteger retryCount = new AtomicInteger();
        boolean interrupted = false;
        try {
            while (doneSignal.getCount() > 0) {
                Record recordToSubmit = null;
                try {
                    recordToSubmit = toSubmit.poll(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    interrupted = true;
                }
                final Record record = recordToSubmit;
                if (null == record) {
                    continue; // Check if all records have completed and if not try to poll again
                }
                // Generate the request based on the record
                AmazonWebServiceRequest request = createRequest(record);
                if (request == null) { // Should only happen if DynamoDB Streams API updates to support different operations
                                       // than {INSERT, MODIFY, REMOVE}.
                    continue;
                }
                // Submit the write request based on its type
                if (request instanceof PutItemRequest) { // PUT
                    getDynamodb().putItemAsync((PutItemRequest) request,
                        (AsyncHandler<PutItemRequest, PutItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
                } else if (request instanceof DeleteItemRequest) { // DELETE
                    getDynamodb().deleteItemAsync((DeleteItemRequest) request,
                        (AsyncHandler<DeleteItemRequest, DeleteItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
                } else if (request instanceof UpdateItemRequest) { // UPDATE
                    getDynamodb().updateItemAsync((UpdateItemRequest) request,
                        (AsyncHandler<UpdateItemRequest, UpdateItemResult>) getHandler(toSubmit, failedRecords, retryCount, doneSignal, record));
                } else { // Should only happen if DynamoDB allows a new operation other than {PutItem, DeleteItem,
                         // UpdateItem} for single item writes.
                    log.warn("Unsupported DynamoDB request: " + request);
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
        emitCloudWatchMetrics(records, failedRecords, retryCount);
        if (!records.isEmpty()) {
            log.debug("Successfully emitted " + (records.size() - failedRecords.size()) + " records ending with sequence number "
                + buffer.getLastSequenceNumber());
        } else {
            log.debug("No records to emit");
        }
        return failedRecords;
    }