in seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java [116:213]
private static Object resolveObject2SeaTunnel(Object field, SeaTunnelDataType<?> fieldType) {
if (field == null) {
return null;
}
switch (fieldType.getSqlType()) {
case ARRAY:
ArrayList<Object> origArray = new ArrayList<>();
((ArrayList) field).iterator().forEachRemaining(origArray::add);
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
switch (elementType.getSqlType()) {
case STRING:
return origArray.toArray(new String[0]);
case BOOLEAN:
return origArray.toArray(new Boolean[0]);
case INT:
return origArray.toArray(new Integer[0]);
case BIGINT:
return origArray.toArray(new Long[0]);
case FLOAT:
return origArray.toArray(new Float[0]);
case DOUBLE:
return origArray.toArray(new Double[0]);
default:
throw new MaxcomputeConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
String.format(
"SeaTunnel type not support this type [%s] now",
fieldType.getSqlType().name()));
}
case MAP:
HashMap<Object, Object> dataMap = new HashMap<>();
SeaTunnelDataType<?> keyType = ((MapType<?, ?>) fieldType).getKeyType();
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) fieldType).getValueType();
HashMap<Object, Object> origDataMap = (HashMap<Object, Object>) field;
origDataMap.forEach(
(key, value) ->
dataMap.put(
resolveObject2SeaTunnel(key, keyType),
resolveObject2SeaTunnel(value, valueType)));
return dataMap;
case ROW:
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) fieldType).getFieldTypes();
Object[] objects = new Object[fieldTypes.length];
List<Object> fieldValues = ((SimpleStruct) field).getFieldValues();
for (int i = 0; i < fieldTypes.length; i++) {
Object object = resolveObject2SeaTunnel(fieldValues.get(i), fieldTypes[i]);
objects[i] = object;
}
return new SeaTunnelRow(objects);
case TINYINT:
case SMALLINT:
case INT:
case FLOAT:
case DOUBLE:
case BIGINT:
case BOOLEAN:
case DECIMAL:
return field;
case BYTES:
return ((Binary) field).data();
case STRING:
if (field instanceof byte[]) {
return new String((byte[]) field);
}
if (field instanceof Char) {
return rtrim(String.valueOf(field));
}
return String.valueOf(field);
case DATE:
if (field instanceof LocalDate) {
return field;
}
return ((Date) field).toLocalDate();
case TIME:
return ((Time) field).toLocalTime();
case TIMESTAMP:
if (field instanceof Instant) {
return ((Instant) field).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
if (field instanceof ZonedDateTime) {
return ((ZonedDateTime) field).toLocalDateTime();
}
if (field instanceof LocalDateTime) {
return field;
}
return ((java.util.Date) field)
.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
case NULL:
default:
throw new MaxcomputeConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
String.format(
"SeaTunnel type not support this type [%s] now",
fieldType.getSqlType().name()));
}
}