private void sendMessageToKinesis()

in services/tracking-service/reactive-vertx/src/main/java/com/amazon/verticles/KinesisVerticle.java [83:116]


    private void sendMessageToKinesis(byte [] byteMessage, String partitionKey) throws KinesisException {
        if (null == kinesisAsyncClient) {
            throw new KinesisException("AmazonKinesisAsync is not initialized");
        }

        SdkBytes payload = SdkBytes.fromByteArray(byteMessage);
        PutRecordRequest putRecordRequest = PutRecordRequest.builder()
                .partitionKey(partitionKey)
                .streamName(eventStream)
                .data(payload)
                .build();

        LOGGER.info("Writing to streamName " + eventStream + " using partitionkey " + partitionKey);

        try {
            CompletableFuture<PutRecordResponse> future = kinesisAsyncClient.putRecord(putRecordRequest);

            future.whenComplete((result, e) -> vertx.runOnContext(none -> {
                if (e != null) {
                    LOGGER.severe("Something happened ... 1");
                    LOGGER.severe(e.getMessage());
                    e.printStackTrace();
                } else {
                    String sequenceNumber = result.sequenceNumber();
                    LOGGER.fine("Message sequence number: " + sequenceNumber);
                }
            }));
        }
        catch (Exception exc) {
            LOGGER.severe("Something happened ... 2");
            exc.printStackTrace();
            LOGGER.severe(exc.getMessage());
        }
    }