in seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java [76:155]
protected boolean validate(Object field, SeaTunnelDataType<?> dataType) {
if (field == null || dataType.getSqlType() == SqlType.NULL) {
return true;
}
SqlType sqlType = dataType.getSqlType();
switch (sqlType) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
case DATE:
case TIME:
case TIMESTAMP:
case TIMESTAMP_TZ:
case FLOAT:
case DOUBLE:
case STRING:
case DECIMAL:
case BYTES:
boolean isEq = (dataType.getTypeClass() == field.getClass());
if (!isEq) {
log.error(
String.format(
"dateType.getTypeClass is %s, but field.getClass is %s",
dataType.getTypeClass(), field.getClass()));
}
return isEq;
case ARRAY:
if (!(field instanceof Object[])) {
return false;
}
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) dataType;
Object[] arrayField = (Object[]) field;
if (arrayField.length == 0) {
return true;
} else {
return validate(arrayField[0], arrayType.getElementType());
}
case MAP:
if (!(field instanceof Map)) {
log.error(
String.format(
"field type is %s, not instanceof java.util.Map",
field.getClass()));
return false;
}
MapType<?, ?> mapType = (MapType<?, ?>) dataType;
Map<?, ?> mapField = (Map<?, ?>) field;
if (mapField.isEmpty()) {
return true;
} else {
Map.Entry<?, ?> entry = mapField.entrySet().stream().findFirst().get();
Object key = entry.getKey();
if (key instanceof scala.Some) {
key = ((scala.Some<?>) key).get();
}
Object value = entry.getValue();
if (value instanceof scala.Some) {
value = ((scala.Some<?>) value).get();
}
return validate(key, mapType.getKeyType())
&& validate(value, mapType.getValueType());
}
case ROW:
if (!(field instanceof SeaTunnelRow)) {
return false;
}
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) dataType).getFieldTypes();
SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field;
for (int i = 0; i < fieldTypes.length; i++) {
if (!validate(seaTunnelRow.getField(i), fieldTypes[i])) {
return false;
}
}
return true;
default:
return false;
}
}