in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [963:1024]
private long executeCopy(byte[] data) throws SQLException, IOException {
long bps = data.length;
InputStream inputStream = new ByteArrayInputStream(data);
if (inputStream == null) {
return 0;
}
inputStream.mark(0);
int retryTime = 0;
while (retryTime++ <= maxRetryTime) {
try {
inputStream.reset();
inputStream.mark(0);
if (baseConn == null || baseConn.isClosed()) {
LOG.info("recreate baseConn within executeCopy");
DruidPooledConnection rawConn = dataSource.getConnection();
baseConn = (BaseConnection) (rawConn.getConnection());
}
if (copyManager == null) {
LOG.info("recreate copyManager within executeCopy");
copyManager = new CopyManager(baseConn);
}
String sql = adbpgDialect.getCopyStatement(tableName, fieldNamesStrs, "STDIN", conflictMode, delimiter, copyFormat, copyQuote);
LOG.info("Writing data with sql:" + sql);
try {
copyManager.copyIn(sql, inputStream);
} catch (PSQLException e) {
LOG.error("Error during copyIn.", e);
if (e.getMessage() != null && e.getMessage().contains("Database connection failed")) {
// Reacquire the connection
if (baseConn != null && !baseConn.isClosed() && !baseConn.isValid(1)) { // isValid(timeout),timeout: seconds
// Discard the invalid connection
if (dataSource != null) {
// We should close and discard valid connection firstly.
// Otherwise, druid will try to recycle the invalid base connection, and print unusable log.
dataSource.discardConnection(baseConn);
}
}
}
copyManager = null; // reset copyManager
if (retryTime <= maxRetryTime) {
LOG.info("Reconnecting and retrying.");
continue; // Retry the copyIn operation
} else {
LOG.error("Failed to execute copyIn after " + maxRetryTime + " retries");
}
}
break;
} catch (SQLException | IOException e) {
if ((e.getMessage() != null && e.getMessage().contains("duplicate key")
&& e.getMessage().contains("violates unique constraint"))
|| retryTime >= maxRetryTime - 1) {
throw e;
}
try {
Thread.sleep(retryWaitTime);
} catch (Exception e1) {
LOG.error("Thread sleep exception in AdbpgOutputFormat class", e1);
}
}
}
return bps * retryTime;
}