in seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java [229:336]
private Object convert(
String field, SeaTunnelDataType<?> fieldType, int level, String fieldName) {
if (StringUtils.isBlank(field)) {
return null;
}
switch (fieldType.getSqlType()) {
case ARRAY:
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
String[] elements = field.split(separators[level + 1]);
ArrayList<Object> objectArrayList = new ArrayList<>();
for (String element : elements) {
objectArrayList.add(convert(element, elementType, level + 1, fieldName));
}
switch (elementType.getSqlType()) {
case STRING:
return objectArrayList.toArray(new String[0]);
case BOOLEAN:
return objectArrayList.toArray(new Boolean[0]);
case TINYINT:
return objectArrayList.toArray(new Byte[0]);
case SMALLINT:
return objectArrayList.toArray(new Short[0]);
case INT:
return objectArrayList.toArray(new Integer[0]);
case BIGINT:
return objectArrayList.toArray(new Long[0]);
case FLOAT:
return objectArrayList.toArray(new Float[0]);
case DOUBLE:
return objectArrayList.toArray(new Double[0]);
case DECIMAL:
return objectArrayList.toArray(new BigDecimal[0]);
case DATE:
return objectArrayList.toArray(new LocalDate[0]);
case TIME:
return objectArrayList.toArray(new LocalTime[0]);
case TIMESTAMP:
return objectArrayList.toArray(new LocalDateTime[0]);
default:
throw new SeaTunnelCsvFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
"SeaTunnel array not support this data type [%s]",
elementType.getSqlType()));
}
case MAP:
SeaTunnelDataType<?> keyType = ((MapType<?, ?>) fieldType).getKeyType();
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) fieldType).getValueType();
LinkedHashMap<Object, Object> objectMap = new LinkedHashMap<>();
String[] kvs = field.split(separators[level + 1]);
for (String kv : kvs) {
String[] splits = kv.split(separators[level + 2]);
if (splits.length < 2) {
objectMap.put(convert(splits[0], keyType, level + 1, fieldName), null);
} else {
objectMap.put(
convert(splits[0], keyType, level + 1, fieldName),
convert(splits[1], valueType, level + 1, fieldName));
}
}
return objectMap;
case STRING:
return field;
case BOOLEAN:
return Boolean.parseBoolean(field);
case TINYINT:
return Byte.parseByte(field);
case SMALLINT:
return Short.parseShort(field);
case INT:
return Integer.parseInt(field);
case BIGINT:
return Long.parseLong(field);
case FLOAT:
return Float.parseFloat(field);
case DOUBLE:
return Double.parseDouble(field);
case DECIMAL:
return new BigDecimal(field);
case NULL:
return null;
case BYTES:
return field.getBytes(StandardCharsets.UTF_8);
case DATE:
return parseDate(field, fieldName);
case TIME:
return parseTime(field);
case TIMESTAMP:
return parseTimestamp(field, fieldName);
case ROW:
Map<Integer, String> splitsMap =
splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType, level + 1);
Object[] objects = new Object[splitsMap.size()];
String[] eleFieldNames = ((SeaTunnelRowType) fieldType).getFieldNames();
for (int i = 0; i < objects.length; i++) {
objects[i] =
convert(
splitsMap.get(i),
((SeaTunnelRowType) fieldType).getFieldType(i),
level + 1,
fieldName + "." + eleFieldNames[i]);
}
return new SeaTunnelRow(objects);
default:
throw CommonError.unsupportedDataType(
"SeaTunnel", fieldType.getSqlType().toString(), fieldName);
}
}