in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [901:954]
private void executeSqlWithPrepareStatement(
String sql, List<RowData> valueList, JdbcRowConverter rowDataConverter, boolean del) throws SQLException {
long start = System.currentTimeMillis();
int retryTime = 0;
while (retryTime++ < maxRetryTime) {
Connection connection = null;
try {
connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
for (RowData rowData : valueList) {
if (existsPrimaryKeys && del) {
rowDataConverter.toExternal(getPrimaryKey(rowData), preparedStatement);
} else {
rowDataConverter.toExternal(rowData, preparedStatement);
}
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
break;
} catch (SQLException exception) {
SQLException throwables = exception;
if (throwables instanceof BatchUpdateException) {
throwables = ((BatchUpdateException) throwables).getNextException();
}
LOG.error(String.format("Execute sql error, sql: %s, retryTimes: %d", sql, retryTime), throwables);
if (retryTime == maxRetryTime) {
if ("strict".equalsIgnoreCase(exceptionMode)) {
throw throwables;
} else {
LOG.warn("Ignore exception {} when execute sql {}", throwables, sql);
sinkSkipCounter.inc();
}
}
try {
// sleep according to retryTimes
Thread.sleep(retryWaitTime);
} catch (Exception e) {
// ignore
}
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception exception) {
LOG.debug("close connection error", exception);
}
}
}
}
long end = System.currentTimeMillis();
reportMetric(valueList, start, end, (long) valueList.size() * sql.length() * 2);
LOG.debug("%s operation succeed on %d records ", !del ? "Delete" : "Write", valueList.size());
}