private void batchDeleteWithoutPk()

in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [504:542]


	private void batchDeleteWithoutPk(List<Row> buffers) {
		for (Row row : buffers) {
			Joiner joinerOnComma = Joiner.on(" AND ").useForNull("null");
			List<String> sub = new ArrayList<String>();
			for (int i = 0; i < row.getArity(); i++) {
				if (caseSensitive){
					if (row.getField(i) == null) {
						sub.add(" \"" + schema.getFieldNames().get(i) + "\" is null ");
					}
					else {
						sub.add(" \"" + schema.getFieldNames().get(i) + "\" = " +
								toField(row.getField(i)));
					}
				}
				else {
					if (row.getField(i) == null) {
						sub.add(" " + schema.getFieldNames().get(i) + " is null ");
					}
					else {
						sub.add(" " + schema.getFieldNames().get(i) + " = " +
								toField(row.getField(i)));
					}
				}
			}
			String sql = null;
			if (caseSensitive) {
				sql = String.format(DELETE_WITH_KEY_SQL_TPL, "\"" + targetSchema + "\".\"" + tableName + "\"", joinerOnComma.join(sub));
			}
			else {
				sql = String.format(DELETE_WITH_KEY_SQL_TPL, targetSchema + "." + tableName, joinerOnComma.join(sub));
			}
			try {
				executeSql(sql);
			}
			catch (SQLException e) {
				LOG.warn("Exception in delete sql: " + sql, e);
			}
		}
	}