in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [544:571]
private void executeSql(String sql) throws SQLException {
int retryTime = 0;
while (retryTime++ < maxRetryTime) {
connect();
try {
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
statement.execute(sql);
break;
} catch (SQLException e) {
//e.printStackTrace();
closeConnection();
if ((e.getMessage() != null
&& e.getMessage().indexOf("duplicate key") != -1
&& e.getMessage().indexOf("violates unique constraint") != -1)
|| retryTime >= maxRetryTime - 1) {
throw e;
}
try {
Thread.sleep(retryWaitTime);
} catch (Exception e1) {
LOG.error("Thread sleep exception in AdbpgOutputFormat class", e1);
}
}
}
closeConnection();
}