def invoke[T]()

in streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisContainer.scala [40:96]


  def invoke[T](
      mapper: RedisMapper[T],
      input: T,
      transaction: Option[redis.clients.jedis.Transaction]): Unit = {
    val key = mapper.getKeyFromData(input)
    val value = mapper.getValueFromData(input)
    mapper.getCommandDescription.getCommand match {
      case RPUSH =>
        transaction match {
          case Some(t) => t.rpush(key, value)
          case _ => this.container.rpush(key, value)
        }
      case LPUSH =>
        transaction match {
          case Some(t) => t.lpush(key, value)
          case _ => this.container.lpush(key, value)
        }
      case SADD =>
        transaction match {
          case Some(t) => t.sadd(key, value)
          case _ => this.container.sadd(key, value)
        }
      case SET =>
        transaction match {
          case Some(t) => t.set(key, value)
          case _ => this.container.set(key, value)
        }
      case PFADD =>
        transaction match {
          case Some(t) => t.pfadd(key, value)
          case _ => this.container.pfadd(key, value)
        }
      case PUBLISH =>
        transaction match {
          case Some(t) => t.publish(key, value)
          case _ => this.container.publish(key, value)
        }
      case ZADD =>
        transaction match {
          case Some(t) => t.zadd(mapper.getCommandDescription.getAdditionalKey, value.toDouble, key)
          case _ => this.container.zadd(mapper.getCommandDescription.getAdditionalKey, value, key)
        }
      case ZREM =>
        transaction match {
          case Some(t) => t.zrem(mapper.getCommandDescription.getAdditionalKey, key)
          case _ => this.container.zrem(mapper.getCommandDescription.getAdditionalKey, key)
        }
      case HSET =>
        transaction match {
          case Some(t) => t.hset(mapper.getCommandDescription.getAdditionalKey, key, value)
          case _ => this.container.hset(mapper.getCommandDescription.getAdditionalKey, key, value)
        }
      case other =>
        throw new IllegalArgumentException(
          "[StreamPark] RedisSink:Cannot process such data type: " + other)
    }
  }