private Stream handleFailedEntries()

in src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkTask.java [116:161]


  private Stream<SinkRecord> handleFailedEntries(
      MappedSinkRecord<EventBridgeResult.Error> errorRecord, Integer attempts) {

    var error = errorRecord.getValue();
    var message = error.getMessage();
    var cause = error.getCause();
    switch (error.getType()) {
      case REPORT_ONLY:
        var failure =
            new EventBridgePartialFailureResponse(errorRecord.getSinkRecord(), message, cause);
        if (dlq != null) {
          log.trace("Sending record to dead-letter queue: {}", errorRecord);
          dlq.report(errorRecord.getSinkRecord(), failure);
        } else {
          log.warn("Dead-letter queue not configured: skipping failed record", failure);
        }
        return empty();

      case RETRY:
        if (attempts > config.maxRetries) {
          log.error(
              "Not retrying failed putItems call: reached max retries attempts={} maxRetries={} "
                  + "errorMessage={}",
              attempts,
              config.maxRetries,
              message,
              cause);
          throw new ConnectException(cause);
        }

        log.warn(
            "Retrying failed putItems call: attempts={} maxRetries={} errorMessage={}",
            attempts,
            config.maxRetries,
            message,
            cause);

        return Stream.of(errorRecord.getSinkRecord());

      case PANIC:
        log.error(
            "Non-retryable failed put call: failing connector errorMessage={}", message, cause);
        throw new ConnectException(cause);
    }
    return Stream.empty();
  }