private String constructDupKey()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [420:458]


    private String constructDupKey(RowData row, List<Integer> pkIndex) {
        String dupKey = "";
        for (int i : pkIndex) {
            if (row.isNullAt(i)) {
                dupKey += "null#";
                continue;
            }
            LogicalType t = logicalTypes[i];
            String valuestr;
            if (t instanceof BooleanType) {
                boolean value = row.getBoolean(i);
                valuestr = value ? "'true'" : "'false'";
            } else if (t instanceof TimestampType) {
                Timestamp value = row.getTimestamp(i, 8).toTimestamp();
                valuestr = "'" + DateUtil.timeStamp2String((Timestamp) value, timeZone, reserveMs) + "'";
            } else if (t instanceof VarCharType || t instanceof CharType) {
                valuestr = toField(row.getString(i).toString());
            } else if (t instanceof FloatType) {
                valuestr = row.getFloat(i) + "";
            } else if (t instanceof DoubleType) {
                valuestr = row.getDouble(i) + "";
            } else if (t instanceof IntType) {
                valuestr = row.getInt(i) + "";
            } else if (t instanceof SmallIntType) {
                valuestr = row.getShort(i) + "";
            } else if (t instanceof TinyIntType) {
                valuestr = row.getByte(i) + "";
            } else if (t instanceof BigIntType) {
                valuestr = row.getLong(i) + "";
            } else if (t instanceof DecimalType) {
                DecimalType dt = (DecimalType) t;
                valuestr = row.getDecimal(i, dt.getPrecision(), dt.getScale()).toString();
            } else {
                throw new RuntimeException("unsupported data type:" + t.toString() + ", please contact developer:wangheyang.why@alibaba-inc.com");
            }
            dupKey += valuestr + "#";
        }
        return dupKey;
    }