private void batchDeleteWithoutPk()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [563:596]


    private void batchDeleteWithoutPk(List<RowData> buffers) {
        try {
            for (RowData rowData : buffers) {
                Set<Integer> nullFieldsIndex = new HashSet<>();
                for (int i = 0; i < rowData.getArity(); ++i) {
                    if (rowData.isNullAt(i)) {
                        nullFieldsIndex.add(i);
                    }
                }
                String sql =
                        adbpgDialect.getDeleteStatementWithNull(tableName, fieldNamesStrs, nullFieldsIndex);

                if (!nullFieldsIndex.isEmpty()) {
                    LogicalType[] types = new LogicalType[rowData.getArity() - nullFieldsIndex.size()];
                    GenericRowData param = new GenericRowData(rowData.getArity() - nullFieldsIndex.size());
                    for (int i = 0, j = 0; i < rowData.getArity(); ++i) {
                        if (!nullFieldsIndex.contains(i)) {
                            types[j] = logicalTypes[i];
                            param.setField(
                                    j, RowData.createFieldGetter(types[j], i).getFieldOrNull(rowData));
                            j++;
                        }
                    }
                    JdbcRowConverter converter = new JdbcRowConverter(types);
                    executeSqlWithPrepareStatement(sql, Collections.singletonList(param), converter, true);
                } else {
                    executeSqlWithPrepareStatement(sql, Collections.singletonList(rowData), rowConverter, true);
                }
            }
            deleteCounter.inc(buffers.size());
        } catch (SQLException e) {
            LOG.warn("Exception in delete sql without pk: ", e);
        }
    }