public void start()

in services/tracking-service/reactive-vertx/src/main/java/com/amazon/verticles/KinesisVerticle.java [49:74]


    public void start() {

        EventBus eb = vertx.eventBus();

        kinesisAsyncClient = createClient();
        eventStream = System.getenv(STREAM_NAME) == null ? "EventStream" : System.getenv(STREAM_NAME);

        eb
                .<String>consumer(KINESIS_EVENTBUS_ADDRESS)
                .handler(message -> {
                    try {
                        TrackingMessage trackingMessage = Json.decodeValue(message.body(), TrackingMessage.class);
                        String partitionKey = trackingMessage.getMessageId();

                        byte [] byteMessage = createMessage(trackingMessage);

                        sendMessageToKinesis(byteMessage, partitionKey);

                        // Now send back reply
                        message.reply("OK");
                    }
                    catch (KinesisException exc) {
                        LOGGER.severe(exc.getMessage());
                    }
                });
    }