in hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/api/table/HologresRowDataConverter.java [127:247]
private void validateDataTypeMapping(
LogicalType[] fieldTypes, HologresTableSchema hologresTableSchema) {
for (int i = 0; i < fieldNames.length; i++) {
boolean matched;
Column hologresColumn = hologresTableSchema.getColumn(fieldNames[i]);
LogicalType flinkType = fieldTypes[i];
switch (hologresColumn.getType()) {
case Types.CHAR:
case Types.VARCHAR:
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.VARCHAR);
break;
case Types.OTHER:
if ("roaringbitmap".equals(hologresColumn.getTypeName())) {
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.VARBINARY);
} else {
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.VARCHAR);
}
break;
case Types.BIT:
case Types.BOOLEAN:
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.BOOLEAN);
break;
case Types.BINARY:
case Types.VARBINARY:
matched =
flinkType.getTypeRoot().equals(LogicalTypeRoot.VARBINARY)
|| flinkType.getTypeRoot().equals(LogicalTypeRoot.BINARY);
break;
case Types.NUMERIC:
case Types.DECIMAL:
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.DECIMAL);
break;
case Types.SMALLINT:
// holo not support TINYINT, SMALLINT should compatible with flink TINYINT
matched =
flinkType.getTypeRoot().equals(LogicalTypeRoot.SMALLINT)
|| flinkType.getTypeRoot().equals(LogicalTypeRoot.TINYINT);
break;
case Types.INTEGER:
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.INTEGER);
break;
case Types.DATE:
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.DATE);
break;
case Types.BIGINT:
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.BIGINT);
break;
case Types.REAL:
case Types.FLOAT:
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.FLOAT);
break;
case Types.DOUBLE:
matched = flinkType.getTypeRoot().equals(LogicalTypeRoot.DOUBLE);
break;
case Types.TIMESTAMP:
case Types.TIMESTAMP_WITH_TIMEZONE:
matched =
flinkType
.getTypeRoot()
.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
|| flinkType
.getTypeRoot()
.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
|| flinkType
.getTypeRoot()
.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
break;
case Types.ARRAY:
matched =
flinkType.getTypeRoot().equals(LogicalTypeRoot.VARCHAR)
|| (flinkType.getTypeRoot().equals(LogicalTypeRoot.ARRAY)
&& (flinkType
.getChildren()
.get(0)
.getTypeRoot()
.equals(LogicalTypeRoot.BOOLEAN)
|| flinkType
.getChildren()
.get(0)
.getTypeRoot()
.equals(LogicalTypeRoot.VARCHAR)
|| flinkType
.getChildren()
.get(0)
.getTypeRoot()
.equals(LogicalTypeRoot.SMALLINT)
|| flinkType
.getChildren()
.get(0)
.getTypeRoot()
.equals(LogicalTypeRoot.INTEGER)
|| flinkType
.getChildren()
.get(0)
.getTypeRoot()
.equals(LogicalTypeRoot.FLOAT)
|| flinkType
.getChildren()
.get(0)
.getTypeRoot()
.equals(LogicalTypeRoot.DOUBLE)
|| flinkType
.getChildren()
.get(0)
.getTypeRoot()
.equals(LogicalTypeRoot.BIGINT)));
break;
default:
throw new IllegalArgumentException(
String.format(
"Hologres sink does not support column %s with data type %s for now",
fieldNames[i], hologresColumn.getTypeName()));
}
if (!matched) {
throw new IllegalArgumentException(
String.format(
"Column: %s type does not match: flink row type: %s, hologres type: %s",
fieldNames[i], fieldTypes[i], hologresColumn.getTypeName()));
}
}
}