in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java [126:250]
private static SerializableFunction<Object, BsonValue> createInternalConverter(
LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return BsonNull.VALUE;
}
};
case BOOLEAN:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonBoolean((boolean) value);
}
};
case INTEGER:
case INTERVAL_YEAR_MONTH:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonInt32((int) value);
}
};
case BIGINT:
case INTERVAL_DAY_TIME:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonInt64((long) value);
}
};
case DOUBLE:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonDouble((double) value);
}
};
case DECIMAL:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
BigDecimal decimalVal = ((DecimalData) value).toBigDecimal();
return new BsonDecimal128(new Decimal128(decimalVal));
}
};
case CHAR:
case VARCHAR:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
String val = value.toString();
// try to parse out the mongodb specific data type from extend-json.
if (val.startsWith("{")
&& val.endsWith("}")
&& val.contains(ENCODE_VALUE_FIELD)) {
try {
BsonDocument doc = BsonDocument.parse(val);
if (doc.containsKey(ENCODE_VALUE_FIELD)) {
return doc.get(ENCODE_VALUE_FIELD);
}
} catch (JsonParseException e) {
// invalid json format, fallback to store as a bson string.
return new BsonString(value.toString());
}
}
return new BsonString(value.toString());
}
};
case BINARY:
case VARBINARY:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonBinary((byte[]) value);
}
};
case TIMESTAMP_WITHOUT_TIME_ZONE:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonDateTime(((TimestampData) value).toTimestamp().getTime());
}
};
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonDateTime(((TimestampData) value).getMillisecond());
}
};
case ROW:
return createRowConverter((RowType) type);
case ARRAY:
return createArrayConverter((ArrayType) type);
case MAP:
return createMapConverter((MapType) type);
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}