in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java [49:105]
public void validate(RowType rowType) throws ValidationException {
for (RowType.RowField field : rowType.getFields()) {
// TODO: We can't convert VARBINARY(n) data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter when n is smaller
// than Integer.MAX_VALUE
if (!supportedTypes().contains(field.getType().getTypeRoot())
|| (field.getType() instanceof VarBinaryType
&& Integer.MAX_VALUE
!= ((VarBinaryType) field.getType()).getLength())) {
throw new ValidationException(
format(
"The %s dialect doesn't support type: %s.",
dialectName(), field.getType()));
}
if (field.getType() instanceof DecimalType) {
Range range =
decimalPrecisionRange()
.orElseThrow(
() ->
new IllegalStateException(
format(
"JdbcDialect %s supports DECIMAL type but no precision range has been set. "
+ "Ensure AbstractDialect#decimalPrecisionRange() is overriden to return a valid Range",
dialectName())));
int precision = ((DecimalType) field.getType()).getPrecision();
if (precision > range.max || precision < range.min) {
throw new ValidationException(
format(
"The precision of field '%s' is out of the DECIMAL "
+ "precision range [%d, %d] supported by %s dialect.",
field.getName(), range.min, range.max, dialectName()));
}
}
if (field.getType() instanceof TimestampType) {
Range range =
timestampPrecisionRange()
.orElseThrow(
() ->
new IllegalStateException(
format(
"JdbcDialect %s supports TIMESTAMP type but no precision range has been set."
+ "Ensure AbstractDialect#timestampPrecisionRange() is overriden to return a valid Range",
dialectName())));
int precision = ((TimestampType) field.getType()).getPrecision();
if (precision > range.max || precision < range.min) {
throw new ValidationException(
format(
"The precision of field '%s' is out of the TIMESTAMP "
+ "precision range [%d, %d] supported by %s dialect.",
field.getName(), range.min, range.max, dialectName()));
}
}
}
}