in services/tracking-service/reactive-vertx/src/main/java/com/amazon/verticles/RedisVerticle.java [103:133]
void registerToEventBusForPubSub(final EventBus eb) {
// register a handler for the incoming message the naming the Redis module will use is base address + '.' + redis channel
eb
.<JsonObject>consumer(REDIS_PUBSUB_CHANNEL_VERTX)
.handler(received -> {
// do whatever you need to do with your message
JsonObject value = received.body().getJsonObject("value");
LOGGER.info("Received the following message: " + value);
// the value is a JSON doc with the following properties
// channel - The channel to which this message was sent
// pattern - Pattern is present if you use psubscribe command and is the pattern that matched this message channel
// message - The message payload
String message = value.getString("message");
JsonObject jsonObject = new JsonObject(message);
eb.send(CACHE_REDIS_EVENTBUS_ADDRESS, jsonObject);
});
// this is a pub/sub so we need to get a dedicated connection:
redis.connect()
.onSuccess(conn -> {
conn
.send(cmd(SUBSCRIBE).arg(Constants.REDIS_PUBSUB_CHANNEL))
.onSuccess(res -> LOGGER.info("Subscribed to " + Constants.REDIS_PUBSUB_CHANNEL))
.onFailure(err -> LOGGER.info("Subscription failed: " + err.getMessage()));
})
.onFailure(err -> LOGGER.info("Failure during connection: " + err.getMessage()));
}