in src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkTask.java [67:114]
public void put(Collection<SinkRecord> records) {
log.trace("EventBridgeSinkTask put called with {} records: {}", records.size(), records);
if (records.size() == 0) {
log.trace("Returning early: 0 records received");
return;
}
var remainingRecords = List.copyOf(records);
var attempts = new AtomicInteger(0);
var start = OffsetDateTime.now(ZoneOffset.UTC);
while (!remainingRecords.isEmpty()) {
attempts.incrementAndGet();
log.trace(
"putItems call started: start={} attempts={} maxRetries={}",
start,
attempts,
config.maxRetries);
remainingRecords =
eventBridgeWriter.putItems(remainingRecords).stream()
.filter(EventBridgeResult::isFailure)
.flatMap(it -> handleFailedEntries(it.failure(), attempts.get()))
.collect(toList());
// only report successful putEvents calls
statusReporter.setSentRecords(records.size());
try {
// TODO: improve retry delay implementation
TimeUnit.MILLISECONDS.sleep(config.retriesDelay);
} catch (InterruptedException ie) {
throw new ConnectException(ie);
}
}
var completion = OffsetDateTime.now(ZoneOffset.UTC);
log.trace(
"putItems call completed: start={} completion={} durationMillis={} "
+ "attempts={} "
+ "maxRetries={}",
start,
completion,
Duration.between(start, completion).toMillis(),
attempts,
config.maxRetries);
}