in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [504:542]
private void batchDeleteWithoutPk(List<Row> buffers) {
for (Row row : buffers) {
Joiner joinerOnComma = Joiner.on(" AND ").useForNull("null");
List<String> sub = new ArrayList<String>();
for (int i = 0; i < row.getArity(); i++) {
if (caseSensitive){
if (row.getField(i) == null) {
sub.add(" \"" + schema.getFieldNames().get(i) + "\" is null ");
}
else {
sub.add(" \"" + schema.getFieldNames().get(i) + "\" = " +
toField(row.getField(i)));
}
}
else {
if (row.getField(i) == null) {
sub.add(" " + schema.getFieldNames().get(i) + " is null ");
}
else {
sub.add(" " + schema.getFieldNames().get(i) + " = " +
toField(row.getField(i)));
}
}
}
String sql = null;
if (caseSensitive) {
sql = String.format(DELETE_WITH_KEY_SQL_TPL, "\"" + targetSchema + "\".\"" + tableName + "\"", joinerOnComma.join(sub));
}
else {
sql = String.format(DELETE_WITH_KEY_SQL_TPL, targetSchema + "." + tableName, joinerOnComma.join(sub));
}
try {
executeSql(sql);
}
catch (SQLException e) {
LOG.warn("Exception in delete sql: " + sql, e);
}
}
}