void registerToEventBusForPubSub()

in services/tracking-service/reactive-vertx/src/main/java/com/amazon/verticles/RedisVerticle.java [103:133]


    void registerToEventBusForPubSub(final EventBus eb) {

        // register a handler for the incoming message the naming the Redis module will use is base address + '.' + redis channel
        eb
                .<JsonObject>consumer(REDIS_PUBSUB_CHANNEL_VERTX)
                .handler(received -> {
                    // do whatever you need to do with your message
                    JsonObject value = received.body().getJsonObject("value");
                    LOGGER.info("Received the following message: " + value);
                    // the value is a JSON doc with the following properties
                    // channel - The channel to which this message was sent
                    // pattern - Pattern is present if you use psubscribe command and is the pattern that matched this message channel
                    // message - The message payload

                    String message = value.getString("message");

                    JsonObject jsonObject = new JsonObject(message);
                    eb.send(CACHE_REDIS_EVENTBUS_ADDRESS, jsonObject);
                });

        // this is a pub/sub so we need to get a dedicated connection:
        redis.connect()
                .onSuccess(conn -> {
                    conn
                            .send(cmd(SUBSCRIBE).arg(Constants.REDIS_PUBSUB_CHANNEL))
                            .onSuccess(res -> LOGGER.info("Subscribed to " + Constants.REDIS_PUBSUB_CHANNEL))
                            .onFailure(err -> LOGGER.info("Subscription failed: " + err.getMessage()));

                })
                .onFailure(err -> LOGGER.info("Failure during connection: " + err.getMessage()));
    }