in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [420:458]
private String constructDupKey(RowData row, List<Integer> pkIndex) {
String dupKey = "";
for (int i : pkIndex) {
if (row.isNullAt(i)) {
dupKey += "null#";
continue;
}
LogicalType t = logicalTypes[i];
String valuestr;
if (t instanceof BooleanType) {
boolean value = row.getBoolean(i);
valuestr = value ? "'true'" : "'false'";
} else if (t instanceof TimestampType) {
Timestamp value = row.getTimestamp(i, 8).toTimestamp();
valuestr = "'" + DateUtil.timeStamp2String((Timestamp) value, timeZone, reserveMs) + "'";
} else if (t instanceof VarCharType || t instanceof CharType) {
valuestr = toField(row.getString(i).toString());
} else if (t instanceof FloatType) {
valuestr = row.getFloat(i) + "";
} else if (t instanceof DoubleType) {
valuestr = row.getDouble(i) + "";
} else if (t instanceof IntType) {
valuestr = row.getInt(i) + "";
} else if (t instanceof SmallIntType) {
valuestr = row.getShort(i) + "";
} else if (t instanceof TinyIntType) {
valuestr = row.getByte(i) + "";
} else if (t instanceof BigIntType) {
valuestr = row.getLong(i) + "";
} else if (t instanceof DecimalType) {
DecimalType dt = (DecimalType) t;
valuestr = row.getDecimal(i, dt.getPrecision(), dt.getScale()).toString();
} else {
throw new RuntimeException("unsupported data type:" + t.toString() + ", please contact developer:wangheyang.why@alibaba-inc.com");
}
dupKey += valuestr + "#";
}
return dupKey;
}