in streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala [70:103]
override def invoke(value: T, context: SinkFunction.Context): Unit = {
require(connection != null)
val sql = apiType match {
case ApiType.scala => scalaToSQLFn(value)
case ApiType.java => javaToSQLFunc.transform(value)
}
batchSize match {
case 1 =>
try {
statement = connection.prepareStatement(sql)
statement.asInstanceOf[PreparedStatement].executeUpdate
connection.commit()
} catch {
case e: Exception =>
logError(s"JdbcSink invoke error:$sql")
throw e
case _: Throwable =>
}
case batch =>
try {
statement.addBatch(sql)
(offset.incrementAndGet() % batch, System.currentTimeMillis()) match {
case (0, _) => execBatch()
case (_, current) if current - timestamp > 1000 => execBatch()
case _ =>
}
} catch {
case e: Exception =>
logError(s"JdbcSink batch invoke error:$sql")
throw e
case _: Throwable =>
}
}
}