in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlussRowToJsonConverters.java [70:116]
private FlussRowToJsonConverter createNotNullConverter(DataType type) {
switch (type.getTypeRoot()) {
case CHAR:
case STRING:
return ((mapper, reuse, value) ->
mapper.getNodeFactory().textNode(value.toString()));
case BOOLEAN:
return (mapper, reuse, value) ->
mapper.getNodeFactory().booleanNode((boolean) value);
case BINARY:
case BYTES:
return ((mapper, reuse, value) ->
mapper.getNodeFactory().binaryNode((byte[]) value));
case DECIMAL:
return createDecimalConverter();
case TINYINT:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((byte) value);
case SMALLINT:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((short) value);
case INTEGER:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((int) value);
case BIGINT:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((long) value);
case FLOAT:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((float) value);
case DOUBLE:
return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((double) value);
case DATE:
return createDateConverter();
case TIME_WITHOUT_TIME_ZONE:
return createTimeConverter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampConverter();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return createTimestampLtzConverter();
case ARRAY:
return createArrayConverter((ArrayType) type);
case MAP:
MapType mapType = (MapType) type;
return createMapConverter(
mapType.asSummaryString(), mapType.getKeyType(), mapType.getValueType());
case ROW:
return createRowConverter((RowType) type);
default:
throw new UnsupportedOperationException("Not support to parse type: " + type);
}
}