private void validateDataTypeMapping()

in hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/api/table/HologresRowDataConverter.java [127:247]


    private void validateDataTypeMapping(
            LogicalType[] fieldTypes, HologresTableSchema hologresTableSchema) {
        for (int i = 0; i < fieldNames.length; i++) {
            boolean matched;
            Column hologresColumn = hologresTableSchema.getColumn(fieldNames[i]);
            LogicalType flinkType = fieldTypes[i];
            switch (hologresColumn.getType()) {
                case Types.CHAR:
                case Types.VARCHAR:
                    matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.VARCHAR);
                    break;
                case Types.OTHER:
                    if ("roaringbitmap".equals(hologresColumn.getTypeName())) {
                        matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.VARBINARY);
                    } else {
                        matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.VARCHAR);
                    }
                    break;
                case Types.BIT:
                case Types.BOOLEAN:
                    matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.BOOLEAN);
                    break;
                case Types.BINARY:
                case Types.VARBINARY:
                    matched =
                            flinkType.getTypeRoot().equals(LogicalTypeRoot.VARBINARY)
                                    || flinkType.getTypeRoot().equals(LogicalTypeRoot.BINARY);
                    break;
                case Types.NUMERIC:
                case Types.DECIMAL:
                    matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.DECIMAL);
                    break;
                case Types.SMALLINT:
                    // holo not support TINYINT, SMALLINT should compatible with flink TINYINT
                    matched =
                            flinkType.getTypeRoot().equals(LogicalTypeRoot.SMALLINT)
                                    || flinkType.getTypeRoot().equals(LogicalTypeRoot.TINYINT);
                    break;
                case Types.INTEGER:
                    matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.INTEGER);
                    break;
                case Types.DATE:
                    matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.DATE);
                    break;
                case Types.BIGINT:
                    matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.BIGINT);
                    break;
                case Types.REAL:
                case Types.FLOAT:
                    matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.FLOAT);
                    break;
                case Types.DOUBLE:
                    matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.DOUBLE);
                    break;
                case Types.TIMESTAMP:
                case Types.TIMESTAMP_WITH_TIMEZONE:
                    matched =
                            flinkType
                                            .getTypeRoot()
                                            .equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
                                    || flinkType
                                            .getTypeRoot()
                                            .equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
                                    || flinkType
                                            .getTypeRoot()
                                            .equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
                    break;
                case Types.ARRAY:
                    matched =
                            flinkType.getTypeRoot().equals(LogicalTypeRoot.VARCHAR)
                                    || (flinkType.getTypeRoot().equals(LogicalTypeRoot.ARRAY)
                                            && (flinkType
                                                            .getChildren()
                                                            .get(0)
                                                            .getTypeRoot()
                                                            .equals(LogicalTypeRoot.BOOLEAN)
                                                    || flinkType
                                                            .getChildren()
                                                            .get(0)
                                                            .getTypeRoot()
                                                            .equals(LogicalTypeRoot.VARCHAR)
                                                    || flinkType
                                                            .getChildren()
                                                            .get(0)
                                                            .getTypeRoot()
                                                            .equals(LogicalTypeRoot.SMALLINT)
                                                    || flinkType
                                                            .getChildren()
                                                            .get(0)
                                                            .getTypeRoot()
                                                            .equals(LogicalTypeRoot.INTEGER)
                                                    || flinkType
                                                            .getChildren()
                                                            .get(0)
                                                            .getTypeRoot()
                                                            .equals(LogicalTypeRoot.FLOAT)
                                                    || flinkType
                                                            .getChildren()
                                                            .get(0)
                                                            .getTypeRoot()
                                                            .equals(LogicalTypeRoot.DOUBLE)
                                                    || flinkType
                                                            .getChildren()
                                                            .get(0)
                                                            .getTypeRoot()
                                                            .equals(LogicalTypeRoot.BIGINT)));
                    break;
                default:
                    throw new IllegalArgumentException(
                            String.format(
                                    "Hologres sink does not support column %s with data type %s for now",
                                    fieldNames[i], hologresColumn.getTypeName()));
            }
            if (!matched) {
                throw new IllegalArgumentException(
                        String.format(
                                "Column: %s type does not match: flink row type: %s, hologres type: %s",
                                fieldNames[i], fieldTypes[i], hologresColumn.getTypeName()));
            }
        }
    }