in src/main/java/software/amazon/event/kafkaconnector/offloading/S3EventBridgeEventDetailValueOffloading.java [149:187]
private EventBridgeResult<PutEventsRequestEntry> apply(
final MappedSinkRecord<PutEventsRequestEntry> item) {
var sinkRecord = item.getSinkRecord();
var putEventsRequestEntry = item.getValue();
try {
var documentContext = JsonPath.parse(putEventsRequestEntry.detail(), jsonPathConfiguration);
var value = documentContext.read(jsonPathRemove);
if (value != null) {
var payload =
value instanceof Map || value instanceof List
? JsonPath.parse(value).jsonString()
: value.toString();
documentContext
.delete(jsonPathRemove)
.put(jsonPathAdd, "dataref", s3ArnOf(putS3Object(payload)))
.put(jsonPathAdd, "datarefJsonPath", jsonPathExp);
}
return success(
sinkRecord, putEventsRequestEntry.copy(it -> it.detail(documentContext.jsonString())));
} catch (final ThrowingFunctionApplyException e) {
var unwrapped = e.getCause();
if (unwrapped instanceof ExecutionException
|| unwrapped instanceof InterruptedException
|| unwrapped instanceof TimeoutException) {
var cause = unwrapped.getCause();
if (cause instanceof S3Exception && ((S3Exception) cause).statusCode() < 500) {
return failure(sinkRecord, panic(e));
}
return failure(sinkRecord, retry(e));
}
return failure(sinkRecord, panic(unwrapped));
}
}