private EventBridgeResult apply()

in src/main/java/software/amazon/event/kafkaconnector/offloading/S3EventBridgeEventDetailValueOffloading.java [149:187]


  private EventBridgeResult<PutEventsRequestEntry> apply(
      final MappedSinkRecord<PutEventsRequestEntry> item) {

    var sinkRecord = item.getSinkRecord();
    var putEventsRequestEntry = item.getValue();

    try {
      var documentContext = JsonPath.parse(putEventsRequestEntry.detail(), jsonPathConfiguration);

      var value = documentContext.read(jsonPathRemove);
      if (value != null) {
        var payload =
            value instanceof Map || value instanceof List
                ? JsonPath.parse(value).jsonString()
                : value.toString();
        documentContext
            .delete(jsonPathRemove)
            .put(jsonPathAdd, "dataref", s3ArnOf(putS3Object(payload)))
            .put(jsonPathAdd, "datarefJsonPath", jsonPathExp);
      }

      return success(
          sinkRecord, putEventsRequestEntry.copy(it -> it.detail(documentContext.jsonString())));

    } catch (final ThrowingFunctionApplyException e) {
      var unwrapped = e.getCause();
      if (unwrapped instanceof ExecutionException
          || unwrapped instanceof InterruptedException
          || unwrapped instanceof TimeoutException) {

        var cause = unwrapped.getCause();
        if (cause instanceof S3Exception && ((S3Exception) cause).statusCode() < 500) {
          return failure(sinkRecord, panic(e));
        }
        return failure(sinkRecord, retry(e));
      }
      return failure(sinkRecord, panic(unwrapped));
    }
  }