in src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java [90:127]
private ListenableFuture<Boolean> publishAsync(
String streamName, String stringEvent, String partitionKey) {
try {
ListenableFuture<UserRecordResult> publishF =
kinesisProducer.addUserRecord(
streamName, partitionKey, ByteBuffer.wrap(stringEvent.getBytes()));
Futures.addCallback(
publishF,
new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
logger.atFine().log(
"KINESIS PRODUCER - Successfully published event '%s' to shardId '%s' [PK: %s] [Sequence: %s] after %s attempt(s)",
stringEvent,
result.getShardId(),
partitionKey,
result.getSequenceNumber(),
result.getAttempts().size());
}
@Override
public void onFailure(Throwable e) {
logger.atSevere().withCause(e).log(
"KINESIS PRODUCER - Failed publishing event %s [PK: %s]",
stringEvent, partitionKey);
}
},
callBackExecutor);
return Futures.transform(
publishF, res -> res != null && res.isSuccessful(), callBackExecutor);
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"KINESIS PRODUCER - Error when publishing event %s [PK: %s]", stringEvent, partitionKey);
return Futures.immediateFailedFuture(e);
}
}