override def invoke()

in streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala [70:103]


  override def invoke(value: T, context: SinkFunction.Context): Unit = {
    require(connection != null)
    val sql = apiType match {
      case ApiType.scala => scalaToSQLFn(value)
      case ApiType.java => javaToSQLFunc.transform(value)
    }
    batchSize match {
      case 1 =>
        try {
          statement = connection.prepareStatement(sql)
          statement.asInstanceOf[PreparedStatement].executeUpdate
          connection.commit()
        } catch {
          case e: Exception =>
            logError(s"JdbcSink invoke error:$sql")
            throw e
          case _: Throwable =>
        }
      case batch =>
        try {
          statement.addBatch(sql)
          (offset.incrementAndGet() % batch, System.currentTimeMillis()) match {
            case (0, _) => execBatch()
            case (_, current) if current - timestamp > 1000 => execBatch()
            case _ =>
          }
        } catch {
          case e: Exception =>
            logError(s"JdbcSink batch invoke error:$sql")
            throw e
          case _: Throwable =>
        }
    }
  }