in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [668:738]
private void batchAdd(List<RowData> rows) {
long bps = 0;
if (null == rows || rows.isEmpty()) {
return;
}
try {
long start = System.currentTimeMillis();
// TODO add a "copy on conflict" mode directly to replace "copy on conflict" when writemode is "copy" and conflictmode is "upsert"
if (writeMode == 1) { /** copy, this is the default write mode */
// preprocess data to be written to the database using copy writemode
byte[] data = preprocessCopyData(rows);
bps = executeCopy(data);
long end = System.currentTimeMillis();
reportMetric(rows, start, end, bps);
} else if (writeMode == 2) { /** batch upsert */
String sql = adbpgDialect.getUpsertStatement(tableName, fieldNamesStrs, primaryFieldNamesStr, nonPrimaryFieldNamesStr);
executeSqlWithPrepareStatement(sql, rows, rowConverter, false);
} else if (writeMode == 0) { /** batch insert */
String insertSql = adbpgDialect.getInsertIntoStatement(tableName, fieldNamesStrs);
executeSqlWithPrepareStatement(insertSql, rows, rowConverter, false);
} else if (writeMode == 3) { /** merge with streaming server api */
executeMergeWithStreamingServer(rows, streamingServerRowConverter, false);
} else {
LOG.error("Unsupported write mode: " + writeMode);
System.exit(255);
}
closeConnection();
} catch (Exception exception) {
// Get the exception message and check if it is a duplicate key exception
String message = exception.getMessage();
boolean isDuplicateKeyException = false;
if (message != null) {
isDuplicateKeyException = message.contains("duplicate key")
&& message.contains("violates unique constraint")
|| message.contains("ON CONFLICT DO UPDATE");
}
if (isDuplicateKeyException) {
LOG.warn("Batch write failed with duplicate-key exception, will retry with preset conflict-mode.");
} else {
LOG.warn("Batch write failed with exception will retry with preset conflict action. The exception is:", exception);
}
// Batch upsert demotes to single upsert when conflictMode='upsert' or writeMode=2
// note that exception generated by prepared statement stack have one extra layer
for (RowData row : rows) {
// note that exception generated by prepared statement stack have one extra layer
if (exception instanceof BatchUpdateException) {
exception = ((BatchUpdateException) exception).getNextException();
}
if (isDuplicateKeyException) {
if ("strict".equalsIgnoreCase(conflictMode)) { /** conflictMode = 'strict', report error without any action */
throw new RuntimeException("duplicate key value violates unique constraint");
} else if ("upsert".equalsIgnoreCase(conflictMode)) { /** conflictMode = 'upsert', use upsert sql */
LOG.warn("Retrying to replace record with upsert.");
upsertRow(row);
} else if ("ignore".equalsIgnoreCase(conflictMode)) {
LOG.warn("Batch write failed, because preset conflictmode is 'ignore', connector will skip this row");
} else { /** conflictMode = 'update' or any other string, use update sql */
updateRow(row);
}
} else {
// exceptionMode only have "strict" and "ignore", if this is "ignore" return directly without report an expection
if ("strict".equalsIgnoreCase(exceptionMode)) {
LOG.error("Found unexpect exception, will ignore this row.", exception);
System.exit(255);
}
}
}
}
}