in paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java [99:300]
public static Object castFromStringInternal(String s, DataType type, boolean isCdcValue) {
BinaryString str = BinaryString.fromString(s);
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
int stringLength = DataTypeChecks.getLength(type);
if (stringLength != VarCharType.MAX_LENGTH && str.numChars() > stringLength) {
throw new IllegalArgumentException(
String.format(
"Length of type %s is %d, but casting result has a length of %d",
type, stringLength, s.length()));
}
return str;
case BOOLEAN:
return BinaryStringUtils.toBoolean(str);
case BINARY:
return isCdcValue
? Base64.getDecoder().decode(s)
: s.getBytes(StandardCharsets.UTF_8);
case VARBINARY:
int binaryLength = DataTypeChecks.getLength(type);
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
if (bytes.length > binaryLength) {
throw new IllegalArgumentException(
String.format(
"Length of type %s is %d, but casting result has a length of %d",
type, binaryLength, bytes.length));
}
return bytes;
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
return Decimal.fromBigDecimal(
new BigDecimal(s), decimalType.getPrecision(), decimalType.getScale());
case TINYINT:
return Byte.valueOf(s);
case SMALLINT:
return Short.valueOf(s);
case INTEGER:
return Integer.valueOf(s);
case BIGINT:
return Long.valueOf(s);
case FLOAT:
double d = Double.parseDouble(s);
if (d == ((float) d)) {
return (float) d;
} else {
// Compatible canal-cdc
Float f = Float.valueOf(s);
String floatStr = f.toString();
if (s.contains(".") && !s.contains("E")) {
int decimal = s.length() - s.indexOf(".") - 1;
floatStr = String.format("%." + decimal + "f", f);
}
if (!floatStr.equals(s)) {
throw new NumberFormatException(
s + " cannot be cast to float due to precision loss");
} else {
return f;
}
}
case DOUBLE:
return Double.valueOf(s);
case DATE:
return BinaryStringUtils.toDate(str);
case TIME_WITHOUT_TIME_ZONE:
return BinaryStringUtils.toTime(str);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
return BinaryStringUtils.toTimestamp(str, timestampType.getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
return BinaryStringUtils.toTimestamp(
str, localZonedTimestampType.getPrecision(), TimeZone.getDefault());
case ARRAY:
ArrayType arrayType = (ArrayType) type;
DataType elementType = arrayType.getElementType();
try {
JsonNode arrayNode = OBJECT_MAPPER.readTree(s);
List<Object> resultList = new ArrayList<>();
for (JsonNode elementNode : arrayNode) {
if (!elementNode.isNull()) {
String elementJson;
if (elementNode.isTextual()) {
elementJson = elementNode.asText();
} else {
elementJson = elementNode.toString();
}
Object elementObject =
castFromStringInternal(elementJson, elementType, isCdcValue);
resultList.add(elementObject);
} else {
resultList.add(null);
}
}
return new GenericArray(resultList.toArray());
} catch (JsonProcessingException e) {
LOG.info(
String.format(
"Failed to parse ARRAY for type %s with value %s", type, s),
e);
// try existing code flow
if (elementType instanceof VarCharType) {
if (s.startsWith("[")) {
s = s.substring(1);
}
if (s.endsWith("]")) {
s = s.substring(0, s.length() - 1);
}
String[] ss = s.split(",");
BinaryString[] binaryStrings = new BinaryString[ss.length];
for (int i = 0; i < ss.length; i++) {
binaryStrings[i] = BinaryString.fromString(ss[i]);
}
return new GenericArray(binaryStrings);
} else {
throw new UnsupportedOperationException("Unsupported type " + type);
}
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
case MAP:
MapType mapType = (MapType) type;
DataType keyType = mapType.getKeyType();
DataType valueType = mapType.getValueType();
try {
JsonNode mapNode = OBJECT_MAPPER.readTree(s);
Map<Object, Object> resultMap = new HashMap<>();
mapNode.fields()
.forEachRemaining(
entry -> {
Object key =
castFromStringInternal(
entry.getKey(), keyType, isCdcValue);
Object value = null;
if (!entry.getValue().isNull()) {
if (entry.getValue().isTextual()) {
value =
castFromStringInternal(
entry.getValue().asText(),
valueType,
isCdcValue);
} else {
value =
castFromStringInternal(
entry.getValue().toString(),
valueType,
isCdcValue);
}
}
resultMap.put(key, value);
});
return new GenericMap(resultMap);
} catch (JsonProcessingException e) {
LOG.info(
String.format("Failed to parse MAP for type %s with value %s", type, s),
e);
return new GenericMap(Collections.emptyMap());
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
case ROW:
RowType rowType = (RowType) type;
try {
JsonNode rowNode = OBJECT_MAPPER.readTree(s);
GenericRow genericRow =
new GenericRow(
rowType.getFields()
.size()); // TODO: What about RowKind? always +I?
for (int pos = 0; pos < rowType.getFields().size(); ++pos) {
DataField field = rowType.getFields().get(pos);
JsonNode fieldNode = rowNode.get(field.name());
if (fieldNode != null && !fieldNode.isNull()) {
String fieldJson;
if (fieldNode.isTextual()) {
fieldJson = fieldNode.asText();
} else {
fieldJson = fieldNode.toString();
}
Object fieldObject =
castFromStringInternal(fieldJson, field.type(), isCdcValue);
genericRow.setField(pos, fieldObject);
} else {
genericRow.setField(pos, null); // Handle null fields
}
}
return genericRow;
} catch (JsonProcessingException e) {
LOG.info(
String.format(
"Failed to parse ROW for type %s with value %s", type, s),
e);
return new GenericRow(0);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
default:
throw new UnsupportedOperationException("Unsupported type " + type);
}
}