in tst/com/amazon/kinesis/streaming/agent/tailing/testing/FileSender.java [109:174]
protected synchronized BufferSendResult<R> attemptSend(RecordBuffer<R> buffer) {
Stopwatch timer = Stopwatch.createStarted();
try {
long sleepMillis = TestUtils.sleep(averageLatencyMillis, latencyJitter);
LOGGER.trace("{}:{} Slept for {}ms", name(), buffer, sleepMillis);
} catch (InterruptedException e) {
// When interrupted, proceed with the send
Thread.currentThread().interrupt();
}
try {
TestUtils.throwOccasionalError(errorBeforeCommitRate, new RuntimeException("Simulated service error in FileSender#attemptSend"));
List<Integer> successfulRecords = new ArrayList<>();
// Pick an index where we check if it's time to raise an error after a partial commit
int indexForAfterCommitError = ThreadLocalRandom.current().nextInt(buffer.sizeRecords());
// Decide ahead of time if we're going to have a partial failure
int maxPartialFailures = TestUtils.decide(partialFailureRate) ?
(1 + ThreadLocalRandom.current().nextInt(Math.max(1, buffer.sizeRecords() / 4))) : 0;
// If we're going to have a partial failure, we should ensure that at least one record fails
int minPartialFailures = maxPartialFailures > 0 ? 1 : 0;
try (FileChannel channel = FileChannel.open(outputFile, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
int index = 0;
for (R record : buffer) {
if(minPartialFailures == 0 && (maxPartialFailures == 0 || TestUtils.decide(0.5))) {
successfulRecords.add(index);
channel.write(record.data().duplicate());
} else {
--maxPartialFailures;
minPartialFailures = Math.max(minPartialFailures - 1, 0);
}
if(index == indexForAfterCommitError) {
// Keep track of possible duplicates, in case we raise an exception
expectedDuplicateRecords.addAndGet(index + 1);
TestUtils.throwOccasionalError(errorAfterPartialCommitRate, new RuntimeException("Simulated failure after partial commit in FileSender#attemptSend"));
// No exeception raise == no duplicates for this buffer
expectedDuplicateRecords.addAndGet(-index - 1);
}
++index;
}
int totalRecords = buffer.sizeRecords();
totalRecordsAttempted.addAndGet(totalRecords);
totalRecordsSent.addAndGet(successfulRecords.size());
totalRecordsFailed.addAndGet(totalRecords - successfulRecords.size());
if(successfulRecords.size() == totalRecords) {
LOGGER.trace("{}:{} Buffer written to file successfully.",
name(), buffer);
return BufferSendResult.succeeded(buffer);
} else {
LOGGER.trace("{}:{} Buffer written to file with partial failure: {} written, {} failed.",
name(), buffer, successfulRecords.size(),
totalRecords - successfulRecords.size());
buffer = buffer.remove(successfulRecords);
return BufferSendResult.succeeded_partially(buffer, totalRecords);
}
} catch (IOException e) {
LOGGER.trace("{}:{} error writing buffer data.", name(), buffer, e);
throw Throwables.propagate(e);
} finally {
LOGGER.trace("{}:{} Total buffer send time: {}ms", name(), buffer, timer.elapsed(TimeUnit.MILLISECONDS));
}
} catch (Exception e) {
totalErrors.incrementAndGet();
throw Throwables.propagate(e);
}
}