in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [573:621]
private void executeCopy(InputStream inputStream) throws SQLException {
if (inputStream == null){
return;
}
inputStream.mark(0);
int retryTime = 0;
while (retryTime++ < maxRetryTime) {
connect();
try {
inputStream.reset();
inputStream.mark(0);
CopyManager manager = new CopyManager((BaseConnection) (dataSource.getConnection().getConnection()));
if (caseSensitive) {
manager.copyIn("COPY \"" + targetSchema + "\".\"" + tableName + "\"( " +fieldNamesCaseSensitive +" )" + " from STDIN", inputStream);
}
else {
manager.copyIn("COPY " + targetSchema + "." + tableName + "( " +fieldNames +" )" + " from STDIN", inputStream);
}
break;
} catch (SQLException e) {
closeConnection();
if ((e.getMessage() != null
&& e.getMessage().indexOf("duplicate key") != -1
&& e.getMessage().indexOf("violates unique constraint") != -1)
|| retryTime >= maxRetryTime - 1) {
throw e;
}
try {
Thread.sleep(retryWaitTime);
} catch (Exception e1) {
LOG.error("Thread sleep exception in AdbpgOutputFormat class", e1);
}
}catch (IOException e) {
closeConnection();
if ((e.getMessage() != null
&& e.getMessage().indexOf("duplicate key") != -1
&& e.getMessage().indexOf("violates unique constraint") != -1)
|| retryTime >= maxRetryTime - 1) {
throw new SQLException("cannot execute copy.");
}
try {
Thread.sleep(retryWaitTime);
} catch (Exception e1) {
LOG.error("Thread sleep exception in AdbpgOutputFormat class", e1);
}
}
}
closeConnection();
}