public void invoke()

in flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java [138:204]


    public void invoke(IN input, Context context) throws Exception {
        String key = redisSinkMapper.getKeyFromData(input);
        String value = redisSinkMapper.getValueFromData(input);

        Optional<String> optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
        Optional<Integer> optAdditionalTTL = redisSinkMapper.getAdditionalTTL(input);

        switch (redisCommand) {
            case RPUSH:
                this.redisCommandsContainer.rpush(key, value);
                break;
            case LPUSH:
                this.redisCommandsContainer.lpush(key, value);
                break;
            case SADD:
                this.redisCommandsContainer.sadd(key, value);
                break;
            case SREM:
                this.redisCommandsContainer.srem(key, value);
                break;
            case SET:
                this.redisCommandsContainer.set(key, value);
                break;
            case SETEX:
                this.redisCommandsContainer.setex(key, value, optAdditionalTTL.orElse(this.additionalTTL));
                break;
            case PFADD:
                this.redisCommandsContainer.pfadd(key, value);
                break;
            case PUBLISH:
                this.redisCommandsContainer.publish(key, value);
                break;
            case ZADD:
                this.redisCommandsContainer.zadd(optAdditionalKey.orElse(this.additionalKey), value, key);
                break;
            case ZINCRBY:
                this.redisCommandsContainer.zincrBy(optAdditionalKey.orElse(this.additionalKey), value, key);
                break;
            case ZREM:
                this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), key);
                break;
            case HSET:
                this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value,
                        optAdditionalTTL.orElse(this.additionalTTL));
                break;
            case HINCRBY:
                this.redisCommandsContainer.hincrBy(optAdditionalKey.orElse(this.additionalKey), key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
                break;
            case HDEL:
                this.redisCommandsContainer.hdel(optAdditionalKey.orElse(this.additionalKey), key);
                break;
            case INCRBY:
                this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
                break;
            case INCRBY_EX:
                this.redisCommandsContainer.incrByEx(key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
                break;
            case DECRBY:
                this.redisCommandsContainer.decrBy(key, Long.valueOf(value));
                break;
            case DESCRBY_EX:
                this.redisCommandsContainer.decrByEx(key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
                break;
            default:
                throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
        }
    }