in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/AdbpgDialect.java [150:182]
public String getCopyStatement(String tableName, String[] fieldNames, String file, String conflictMode, String delimiter, String copyFormat, String copyQuote) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String conflictAction;
String format = "";
String quote = "";
if ("ignore".equalsIgnoreCase(conflictMode) /** if conflictmode is not "upsert", use normal copy statement or insert statement */
|| "strict".equalsIgnoreCase(conflictMode)
|| "update".equalsIgnoreCase(conflictMode)) {
conflictAction = "";
} else { /** if conflictmode is "upsert", use copy-on-conflict statement or insert-on-conflict statement */
conflictAction = " DO on conflict DO update";
}
if ("csv".equalsIgnoreCase(copyFormat)) {
quote = " quote '" + copyQuote + "'";
format = " " + copyFormat + " ESCAPE '\\'" + quote;
}
return "COPY "
+ quoteIdentifier(targetSchema)
+ "."
+ quoteIdentifier(tableName)
+ "("
+ columns
+ ")"
+ " FROM "
+ file
+ " DELIMITER '"+ delimiter +"'" // DELIMITER '\t'
+ " NULL 'null'"
+ format
+ conflictAction;
}