in services/tracking-service/reactive-vertx/src/main/java/com/amazon/verticles/CacheVerticle.java [79:115]
private void registerToEventBusToGetData(final EventBus eb) {
eb
.<JsonObject>consumer(Constants.CACHE_EVENTBUS_ADDRESS)
.handler(message -> {
// Is data stored in cache?
TrackingMessage trackingMessage = Json.decodeValue(message.body().encode(), TrackingMessage.class);
LOGGER.fine("Wrote message to cache: " + message.body());
TrackingMessage value = CACHE.getIfPresent(trackingMessage.getProgramId());
if (null == value) {
JsonObject msgToSend = JsonObject.mapFrom(trackingMessage);
LOGGER.info("Key " + trackingMessage.getProgramId() + " not found in cache --> Redis");
eb
.<JsonObject>request(Constants.REDIS_EVENTBUS_ADDRESS, msgToSend)
.onSuccess(res -> {
JsonObject msg = res.body();
if (msg.isEmpty()) {
message.reply(msg);
} else {
LOGGER.fine("Message from Redis-Verticle: " + msg);
TrackingMessage msgFromRedis = Json.decodeValue(msg.encode(), TrackingMessage.class);
CACHE.put(msgFromRedis.getProgramId(), msgFromRedis);
message.reply(msg);
}
})
.onFailure(err -> message.reply(new JsonObject()));
} else {
LOGGER.fine("Message " + Json.encode(value) + " found in cache --> HttpVerticle");
value.setMessageId(trackingMessage.getMessageId());
message.reply(JsonObject.mapFrom(value));
}
});
}