private long executeCopy()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [963:1024]


    private long executeCopy(byte[] data) throws SQLException, IOException {
        long bps = data.length;
        InputStream inputStream = new ByteArrayInputStream(data);
        if (inputStream == null) {
            return 0;
        }
        inputStream.mark(0);
        int retryTime = 0;
        while (retryTime++ <= maxRetryTime) {
            try {
                inputStream.reset();
                inputStream.mark(0);
                if (baseConn == null || baseConn.isClosed()) {
                    LOG.info("recreate baseConn within executeCopy");
                    DruidPooledConnection rawConn = dataSource.getConnection();
                    baseConn = (BaseConnection) (rawConn.getConnection());
                }
                if (copyManager == null) {
                    LOG.info("recreate copyManager within executeCopy");
                    copyManager = new CopyManager(baseConn);
                }
                String sql = adbpgDialect.getCopyStatement(tableName, fieldNamesStrs, "STDIN", conflictMode, delimiter, copyFormat, copyQuote);
                LOG.info("Writing data with sql:" + sql);
                try {
                    copyManager.copyIn(sql, inputStream);
                } catch (PSQLException e) {
                    LOG.error("Error during copyIn.", e);
                    if (e.getMessage() != null && e.getMessage().contains("Database connection failed")) {
                        // Reacquire the connection
                        if (baseConn != null && !baseConn.isClosed() && !baseConn.isValid(1)) { // isValid(timeout),timeout: seconds
                            // Discard the invalid connection
                            if (dataSource != null) {
                                // We should close and discard valid connection firstly.
                                // Otherwise, druid will try to recycle the invalid base connection, and print unusable log.
                                dataSource.discardConnection(baseConn);
                            }
                        }
                    }
                    copyManager = null; //  reset copyManager
                    if (retryTime <= maxRetryTime) {
                        LOG.info("Reconnecting and retrying.");
                        continue; // Retry the copyIn operation
                    } else {
                        LOG.error("Failed to execute copyIn after " + maxRetryTime + " retries");
                    }
                }
                break;
            } catch (SQLException | IOException e) {
                if ((e.getMessage() != null && e.getMessage().contains("duplicate key")
                        && e.getMessage().contains("violates unique constraint"))
                        || retryTime >= maxRetryTime - 1) {
                    throw e;
                }
                try {
                    Thread.sleep(retryWaitTime);
                } catch (Exception e1) {
                    LOG.error("Thread sleep exception in AdbpgOutputFormat class", e1);
                }
            }
        }
        return bps * retryTime;
    }