in services/tracking-service/reactive-vertx/src/main/java/com/amazon/verticles/KinesisVerticle.java [49:74]
public void start() {
EventBus eb = vertx.eventBus();
kinesisAsyncClient = createClient();
eventStream = System.getenv(STREAM_NAME) == null ? "EventStream" : System.getenv(STREAM_NAME);
eb
.<String>consumer(KINESIS_EVENTBUS_ADDRESS)
.handler(message -> {
try {
TrackingMessage trackingMessage = Json.decodeValue(message.body(), TrackingMessage.class);
String partitionKey = trackingMessage.getMessageId();
byte [] byteMessage = createMessage(trackingMessage);
sendMessageToKinesis(byteMessage, partitionKey);
// Now send back reply
message.reply("OK");
}
catch (KinesisException exc) {
LOGGER.severe(exc.getMessage());
}
});
}