in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [623:661]
private void updateRow(Row row){
//get non primary keys
ArrayList<String> allFields = schema.getFieldNames();
Set<String> nonPrimaryKeys = new HashSet<String>();
for(String field : allFields){
if(!primaryKeys.contains(field)){
nonPrimaryKeys.add(field);
}
}
if(nonPrimaryKeys.size() == 0){
return;
}
String whereStatement = StringUtils.join(
deleteFormat(schema, row, primaryKeys, timeZone, reserveMs), " AND "
);
String setStatement = StringUtils.join(
deleteFormat(schema, row, nonPrimaryKeys, timeZone, reserveMs), ","
);
StringBuilder sql = new StringBuilder();
if (caseSensitive) {
sql.append("UPDATE \"").append(targetSchema).append("\".\"").append(tableName).append("\" SET ").append(setStatement).append(" WHERE ").append(whereStatement);
}
else {
sql.append("UPDATE ").append(targetSchema).append(".").append(tableName).append(" SET ").append(setStatement).append(" WHERE ").append(whereStatement);
}
try{
executeSql(sql.toString());
} catch (SQLException updateException){
LOG.error("Exception in update sql: "+sql.toString(), updateException);
try {
executeSql(getInsertSQL(row));
}
catch (SQLException e) {
}
}
}