in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java [129:240]
private static SerializableFunction<BsonValue, Object> createInternalConverter(
LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return null;
}
};
case BOOLEAN:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return convertToBoolean(bsonValue);
}
};
case INTEGER:
case INTERVAL_YEAR_MONTH:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return convertToInt(bsonValue);
}
};
case BIGINT:
case INTERVAL_DAY_TIME:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return convertToLong(bsonValue);
}
};
case TIMESTAMP_WITHOUT_TIME_ZONE:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return TimestampData.fromLocalDateTime(convertToLocalDateTime(bsonValue));
}
};
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return TimestampData.fromInstant(convertToInstant(bsonValue));
}
};
case DOUBLE:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return convertToDouble(bsonValue);
}
};
case CHAR:
case VARCHAR:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return StringData.fromString(convertToString(bsonValue));
}
};
case BINARY:
case VARBINARY:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
return convertToBinary(bsonValue);
}
};
case DECIMAL:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
DecimalType decimalType = (DecimalType) type;
BigDecimal decimalValue = convertToBigDecimal(bsonValue);
return DecimalData.fromBigDecimal(
decimalValue, decimalType.getPrecision(), decimalType.getScale());
}
};
case ROW:
return createRowConverter((RowType) type);
case ARRAY:
return createArrayConverter((ArrayType) type);
case MAP:
return createMapConverter((MapType) type);
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}