in src/main/java/software/amazon/event/kafkaconnector/offloading/S3EventBridgeEventDetailValueOffloading.java [193:223]
private UUID putS3Object(final String payload) {
var durationHolder = new Duration[] {null};
var s3ObjectId =
s3ObjectKeyCache.computeIfAbsent(
sha512Of(payload),
ThrowingFunction.wrap(
(sha512) -> {
var id = idGenerator.get();
var request =
PutObjectRequest.builder().bucket(bucketName).key(id.toString()).build();
var body = fromString(payload, UTF_8);
var start = OffsetDateTime.now(UTC);
logger.trace("uploading object to s3 with key={}", s3ArnOf(id));
client.putObject(request, body).get(SDK_TIMEOUT, MILLISECONDS);
durationHolder[0] = Duration.between(start, OffsetDateTime.now(UTC));
return id;
}));
var duration = durationHolder[0];
logger.trace(
"s3 offloading completed: jsonPath='{}' key={} cache={} durationMillis={}",
jsonPathExp,
s3ArnOf(s3ObjectId),
duration == null ? "HIT" : "MISS",
duration == null ? "0" : duration.toMillis());
return s3ObjectId;
}