in services/tracking-service/reactive-vertx/src/main/java/com/amazon/verticles/KinesisVerticle.java [83:116]
private void sendMessageToKinesis(byte [] byteMessage, String partitionKey) throws KinesisException {
if (null == kinesisAsyncClient) {
throw new KinesisException("AmazonKinesisAsync is not initialized");
}
SdkBytes payload = SdkBytes.fromByteArray(byteMessage);
PutRecordRequest putRecordRequest = PutRecordRequest.builder()
.partitionKey(partitionKey)
.streamName(eventStream)
.data(payload)
.build();
LOGGER.info("Writing to streamName " + eventStream + " using partitionkey " + partitionKey);
try {
CompletableFuture<PutRecordResponse> future = kinesisAsyncClient.putRecord(putRecordRequest);
future.whenComplete((result, e) -> vertx.runOnContext(none -> {
if (e != null) {
LOGGER.severe("Something happened ... 1");
LOGGER.severe(e.getMessage());
e.printStackTrace();
} else {
String sequenceNumber = result.sequenceNumber();
LOGGER.fine("Message sequence number: " + sequenceNumber);
}
}));
}
catch (Exception exc) {
LOGGER.severe("Something happened ... 2");
exc.printStackTrace();
LOGGER.severe(exc.getMessage());
}
}