in flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java [138:204]
public void invoke(IN input, Context context) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
Optional<String> optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
Optional<Integer> optAdditionalTTL = redisSinkMapper.getAdditionalTTL(input);
switch (redisCommand) {
case RPUSH:
this.redisCommandsContainer.rpush(key, value);
break;
case LPUSH:
this.redisCommandsContainer.lpush(key, value);
break;
case SADD:
this.redisCommandsContainer.sadd(key, value);
break;
case SREM:
this.redisCommandsContainer.srem(key, value);
break;
case SET:
this.redisCommandsContainer.set(key, value);
break;
case SETEX:
this.redisCommandsContainer.setex(key, value, optAdditionalTTL.orElse(this.additionalTTL));
break;
case PFADD:
this.redisCommandsContainer.pfadd(key, value);
break;
case PUBLISH:
this.redisCommandsContainer.publish(key, value);
break;
case ZADD:
this.redisCommandsContainer.zadd(optAdditionalKey.orElse(this.additionalKey), value, key);
break;
case ZINCRBY:
this.redisCommandsContainer.zincrBy(optAdditionalKey.orElse(this.additionalKey), value, key);
break;
case ZREM:
this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), key);
break;
case HSET:
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value,
optAdditionalTTL.orElse(this.additionalTTL));
break;
case HINCRBY:
this.redisCommandsContainer.hincrBy(optAdditionalKey.orElse(this.additionalKey), key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
break;
case HDEL:
this.redisCommandsContainer.hdel(optAdditionalKey.orElse(this.additionalKey), key);
break;
case INCRBY:
this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
break;
case INCRBY_EX:
this.redisCommandsContainer.incrByEx(key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
break;
case DECRBY:
this.redisCommandsContainer.decrBy(key, Long.valueOf(value));
break;
case DESCRBY_EX:
this.redisCommandsContainer.decrByEx(key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
}