public void put()

in src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkTask.java [67:114]


  public void put(Collection<SinkRecord> records) {
    log.trace("EventBridgeSinkTask put called with {} records: {}", records.size(), records);
    if (records.size() == 0) {
      log.trace("Returning early: 0 records received");
      return;
    }

    var remainingRecords = List.copyOf(records);

    var attempts = new AtomicInteger(0);
    var start = OffsetDateTime.now(ZoneOffset.UTC);

    while (!remainingRecords.isEmpty()) {
      attempts.incrementAndGet();
      log.trace(
          "putItems call started: start={} attempts={} maxRetries={}",
          start,
          attempts,
          config.maxRetries);

      remainingRecords =
          eventBridgeWriter.putItems(remainingRecords).stream()
              .filter(EventBridgeResult::isFailure)
              .flatMap(it -> handleFailedEntries(it.failure(), attempts.get()))
              .collect(toList());

      // only report successful putEvents calls
      statusReporter.setSentRecords(records.size());

      try {
        // TODO: improve retry delay implementation
        TimeUnit.MILLISECONDS.sleep(config.retriesDelay);
      } catch (InterruptedException ie) {
        throw new ConnectException(ie);
      }
    }

    var completion = OffsetDateTime.now(ZoneOffset.UTC);
    log.trace(
        "putItems call completed: start={} completion={} durationMillis={} "
            + "attempts={} "
            + "maxRetries={}",
        start,
        completion,
        Duration.between(start, completion).toMillis(),
        attempts,
        config.maxRetries);
  }