private DataType extractFieldType()

in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java [149:206]


    private DataType extractFieldType(DebeziumEvent.Field field) {
        switch (field.type()) {
            case "array":
                return DataTypes.ARRAY(DataTypes.STRING());
            case "map":
            case "struct":
                return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
            case "int8":
                return DataTypes.TINYINT();
            case "int16":
                return DataTypes.SMALLINT();
            case "int32":
                if (Date.SCHEMA_NAME.equals(field.name())) {
                    return DataTypes.DATE();
                }
                return DataTypes.INT();
            case "int64":
                if (MicroTimestamp.SCHEMA_NAME.equals(field.name())) {
                    return DataTypes.TIMESTAMP(6);
                } else if (MicroTime.SCHEMA_NAME.equals(field.name())) {
                    return DataTypes.TIME(6);
                }
                return DataTypes.BIGINT();
            case "float":
            case "float32":
                return DataTypes.FLOAT();
            case "float64":
            case "double":
                return DataTypes.DOUBLE();
            case "boolean":
                return DataTypes.BOOLEAN();
            case "string":
                return DataTypes.STRING();
            case "bytes":
                if (decimalLogicalName().equals(field.name())) {
                    int precision = field.parameters().get("connect.decimal.precision").asInt();
                    int scale = field.parameters().get("scale").asInt();
                    return DataTypes.DECIMAL(precision, scale);
                } else if (Bits.LOGICAL_NAME.equals(field.name())) {
                    String stringifyLength = field.parameters().get("length").asText();
                    if (StringUtils.isNullOrWhitespaceOnly(stringifyLength)) {
                        return DataTypes.BOOLEAN();
                    }
                    int length = Integer.parseInt(stringifyLength);
                    if (length == 1) {
                        return DataTypes.BOOLEAN();
                    } else {
                        return DataTypes.BINARY(
                                length == Integer.MAX_VALUE ? length / 8 : (length + 7) / 8);
                    }
                }
                // field.name() == null
                return DataTypes.BYTES();
            default:
                // default to String type
                return DataTypes.STRING();
        }
    }