in src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkTask.java [116:161]
private Stream<SinkRecord> handleFailedEntries(
MappedSinkRecord<EventBridgeResult.Error> errorRecord, Integer attempts) {
var error = errorRecord.getValue();
var message = error.getMessage();
var cause = error.getCause();
switch (error.getType()) {
case REPORT_ONLY:
var failure =
new EventBridgePartialFailureResponse(errorRecord.getSinkRecord(), message, cause);
if (dlq != null) {
log.trace("Sending record to dead-letter queue: {}", errorRecord);
dlq.report(errorRecord.getSinkRecord(), failure);
} else {
log.warn("Dead-letter queue not configured: skipping failed record", failure);
}
return empty();
case RETRY:
if (attempts > config.maxRetries) {
log.error(
"Not retrying failed putItems call: reached max retries attempts={} maxRetries={} "
+ "errorMessage={}",
attempts,
config.maxRetries,
message,
cause);
throw new ConnectException(cause);
}
log.warn(
"Retrying failed putItems call: attempts={} maxRetries={} errorMessage={}",
attempts,
config.maxRetries,
message,
cause);
return Stream.of(errorRecord.getSinkRecord());
case PANIC:
log.error(
"Non-retryable failed put call: failing connector errorMessage={}", message, cause);
throw new ConnectException(cause);
}
return Stream.empty();
}