protected synchronized BufferSendResult attemptSend()

in tst/com/amazon/kinesis/streaming/agent/tailing/testing/FileSender.java [109:174]


    protected synchronized BufferSendResult<R> attemptSend(RecordBuffer<R> buffer) {
        Stopwatch timer = Stopwatch.createStarted();
        try {
            long sleepMillis = TestUtils.sleep(averageLatencyMillis, latencyJitter);
            LOGGER.trace("{}:{} Slept for {}ms", name(), buffer, sleepMillis);
        } catch (InterruptedException e) {
            // When interrupted, proceed with the send
            Thread.currentThread().interrupt();
        }
        try {
            TestUtils.throwOccasionalError(errorBeforeCommitRate, new RuntimeException("Simulated service error in FileSender#attemptSend"));
            List<Integer> successfulRecords = new ArrayList<>();

            // Pick an index where we check if it's time to raise an error after a partial commit
            int indexForAfterCommitError = ThreadLocalRandom.current().nextInt(buffer.sizeRecords());

            // Decide ahead of time if we're going to have a partial failure
            int maxPartialFailures = TestUtils.decide(partialFailureRate) ?
                    (1 + ThreadLocalRandom.current().nextInt(Math.max(1, buffer.sizeRecords() / 4))) : 0;
            // If we're going to have a partial failure, we should ensure that at least one record fails
            int minPartialFailures = maxPartialFailures > 0 ? 1 : 0;
            try (FileChannel channel = FileChannel.open(outputFile, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
                int index = 0;
                for (R record : buffer) {
                    if(minPartialFailures == 0 && (maxPartialFailures == 0 || TestUtils.decide(0.5))) {
                        successfulRecords.add(index);
                        channel.write(record.data().duplicate());
                    } else {
                        --maxPartialFailures;
                        minPartialFailures = Math.max(minPartialFailures - 1, 0);
                    }
                    if(index == indexForAfterCommitError) {
                        // Keep track of possible duplicates, in case we raise an exception
                        expectedDuplicateRecords.addAndGet(index + 1);
                        TestUtils.throwOccasionalError(errorAfterPartialCommitRate, new RuntimeException("Simulated failure after partial commit in FileSender#attemptSend"));
                        // No exeception raise == no duplicates for this buffer
                        expectedDuplicateRecords.addAndGet(-index - 1);
                    }
                    ++index;
                }
                int totalRecords = buffer.sizeRecords();
                totalRecordsAttempted.addAndGet(totalRecords);
                totalRecordsSent.addAndGet(successfulRecords.size());
                totalRecordsFailed.addAndGet(totalRecords - successfulRecords.size());
                if(successfulRecords.size() == totalRecords) {
                    LOGGER.trace("{}:{} Buffer written to file successfully.",
                            name(), buffer);
                    return BufferSendResult.succeeded(buffer);
                } else {
                    LOGGER.trace("{}:{} Buffer written to file with partial failure: {} written, {} failed.",
                            name(), buffer, successfulRecords.size(),
                            totalRecords - successfulRecords.size());
                    buffer = buffer.remove(successfulRecords);
                    return BufferSendResult.succeeded_partially(buffer, totalRecords);
                }
            } catch (IOException e) {
                LOGGER.trace("{}:{} error writing buffer data.", name(), buffer, e);
                throw Throwables.propagate(e);
            } finally {
                LOGGER.trace("{}:{} Total buffer send time: {}ms", name(), buffer, timer.elapsed(TimeUnit.MILLISECONDS));
            }
        } catch (Exception e) {
            totalErrors.incrementAndGet();
            throw Throwables.propagate(e);
        }
    }