in inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java [113:176]
private JsonToRowDataConverter createNotNullConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return jsonNode -> null;
case BOOLEAN:
return this::convertToBoolean;
case TINYINT:
return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
case SMALLINT:
return jsonNode -> Short.parseShort(jsonNode.asText().trim());
case INTEGER:
case INTERVAL_YEAR_MONTH:
return this::convertToInt;
case BIGINT:
case INTERVAL_DAY_TIME:
return this::convertToLong;
case DATE:
return this::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return this::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return this::convertToTimestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (adaptSpark) {
return jsonNode -> {
try {
return convertToTimestampWithLocalZone(jsonNode);
} catch (DateTimeParseException e) {
return convertToTimestamp(jsonNode);
}
};
}
return this::convertToTimestampWithLocalZone;
case FLOAT:
return this::convertToFloat;
case DOUBLE:
return this::convertToDouble;
case CHAR:
case VARCHAR:
return this::convertToString;
case BINARY:
case VARBINARY:
return this::convertToBytes;
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ARRAY:
return createArrayConverter((ArrayType) type);
case MAP:
MapType mapType = (MapType) type;
return createMapConverter(
mapType.asSummaryString(), mapType.getKeyType(), mapType.getValueType());
case MULTISET:
MultisetType multisetType = (MultisetType) type;
return createMapConverter(
multisetType.asSummaryString(),
multisetType.getElementType(),
new IntType());
case ROW:
return createRowConverter((RowType) type);
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}