in streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisContainer.scala [40:96]
def invoke[T](
mapper: RedisMapper[T],
input: T,
transaction: Option[redis.clients.jedis.Transaction]): Unit = {
val key = mapper.getKeyFromData(input)
val value = mapper.getValueFromData(input)
mapper.getCommandDescription.getCommand match {
case RPUSH =>
transaction match {
case Some(t) => t.rpush(key, value)
case _ => this.container.rpush(key, value)
}
case LPUSH =>
transaction match {
case Some(t) => t.lpush(key, value)
case _ => this.container.lpush(key, value)
}
case SADD =>
transaction match {
case Some(t) => t.sadd(key, value)
case _ => this.container.sadd(key, value)
}
case SET =>
transaction match {
case Some(t) => t.set(key, value)
case _ => this.container.set(key, value)
}
case PFADD =>
transaction match {
case Some(t) => t.pfadd(key, value)
case _ => this.container.pfadd(key, value)
}
case PUBLISH =>
transaction match {
case Some(t) => t.publish(key, value)
case _ => this.container.publish(key, value)
}
case ZADD =>
transaction match {
case Some(t) => t.zadd(mapper.getCommandDescription.getAdditionalKey, value.toDouble, key)
case _ => this.container.zadd(mapper.getCommandDescription.getAdditionalKey, value, key)
}
case ZREM =>
transaction match {
case Some(t) => t.zrem(mapper.getCommandDescription.getAdditionalKey, key)
case _ => this.container.zrem(mapper.getCommandDescription.getAdditionalKey, key)
}
case HSET =>
transaction match {
case Some(t) => t.hset(mapper.getCommandDescription.getAdditionalKey, key, value)
case _ => this.container.hset(mapper.getCommandDescription.getAdditionalKey, key, value)
}
case other =>
throw new IllegalArgumentException(
"[StreamPark] RedisSink:Cannot process such data type: " + other)
}
}