in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java [227:274]
private DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return (docObj) -> null;
case BOOLEAN:
return this::convertToBoolean;
case TINYINT:
return this::convertToTinyInt;
case SMALLINT:
return this::convertToSmallInt;
case INTEGER:
case INTERVAL_YEAR_MONTH:
return this::convertToInt;
case BIGINT:
case INTERVAL_DAY_TIME:
return this::convertToLong;
case DATE:
return this::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return this::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return this::convertToTimestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return this::convertToLocalTimeZoneTimestamp;
case FLOAT:
return this::convertToFloat;
case DOUBLE:
return this::convertToDouble;
case CHAR:
case VARCHAR:
return this::convertToString;
case BINARY:
case VARBINARY:
return this::convertToBinary;
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ROW:
return createRowConverter((RowType) type);
case ARRAY:
return createArrayConverter((ArrayType) type);
case MAP:
return createMapConverter((MapType) type);
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}