private void updateRow()

in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [623:661]


	private void updateRow(Row row){
		//get non primary keys
		ArrayList<String> allFields = schema.getFieldNames();
		Set<String> nonPrimaryKeys = new HashSet<String>();
		for(String field : allFields){
			if(!primaryKeys.contains(field)){
				nonPrimaryKeys.add(field);
			}
		}
		if(nonPrimaryKeys.size() == 0){
			return;
		}
		String whereStatement = StringUtils.join(
				deleteFormat(schema, row, primaryKeys, timeZone, reserveMs), " AND "
		);
		String setStatement = StringUtils.join(
				deleteFormat(schema, row, nonPrimaryKeys, timeZone, reserveMs), ","
		);

		StringBuilder sql = new StringBuilder();
		if (caseSensitive) {
			sql.append("UPDATE \"").append(targetSchema).append("\".\"").append(tableName).append("\" SET ").append(setStatement).append(" WHERE ").append(whereStatement);

		}
		else {
			sql.append("UPDATE ").append(targetSchema).append(".").append(tableName).append(" SET ").append(setStatement).append(" WHERE ").append(whereStatement);
		}
		try{
			executeSql(sql.toString());
		} catch (SQLException updateException){
			LOG.error("Exception in update sql: "+sql.toString(), updateException);
			try {
				executeSql(getInsertSQL(row));
			}
			catch (SQLException e) {

			}
		}
	}