in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [563:596]
private void batchDeleteWithoutPk(List<RowData> buffers) {
try {
for (RowData rowData : buffers) {
Set<Integer> nullFieldsIndex = new HashSet<>();
for (int i = 0; i < rowData.getArity(); ++i) {
if (rowData.isNullAt(i)) {
nullFieldsIndex.add(i);
}
}
String sql =
adbpgDialect.getDeleteStatementWithNull(tableName, fieldNamesStrs, nullFieldsIndex);
if (!nullFieldsIndex.isEmpty()) {
LogicalType[] types = new LogicalType[rowData.getArity() - nullFieldsIndex.size()];
GenericRowData param = new GenericRowData(rowData.getArity() - nullFieldsIndex.size());
for (int i = 0, j = 0; i < rowData.getArity(); ++i) {
if (!nullFieldsIndex.contains(i)) {
types[j] = logicalTypes[i];
param.setField(
j, RowData.createFieldGetter(types[j], i).getFieldOrNull(rowData));
j++;
}
}
JdbcRowConverter converter = new JdbcRowConverter(types);
executeSqlWithPrepareStatement(sql, Collections.singletonList(param), converter, true);
} else {
executeSqlWithPrepareStatement(sql, Collections.singletonList(rowData), rowConverter, true);
}
}
deleteCounter.inc(buffers.size());
} catch (SQLException e) {
LOG.warn("Exception in delete sql without pk: ", e);
}
}