in seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java [103:254]
private static SerializableFunction<Object, BsonValue> createInternalConverter(
SeaTunnelDataType<?> type) {
switch (type.getSqlType()) {
case NULL:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return BsonNull.VALUE;
}
};
case BOOLEAN:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonBoolean((boolean) value);
}
};
case TINYINT:
case SMALLINT:
case INT:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
int intValue =
value instanceof Byte
? ((Byte) value) & 0xFF
: value instanceof Short
? ((Short) value).intValue()
: (int) value;
return new BsonInt32(intValue);
}
};
case BIGINT:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonInt64((long) value);
}
};
case FLOAT:
case DOUBLE:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
double v =
value instanceof Float
? ((Float) value).doubleValue()
: (double) value;
return new BsonDouble(v);
}
};
case STRING:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
String val = value.toString();
// try to parse out the mongodb specific data type from extend-json.
if (val.startsWith("{")
&& val.endsWith("}")
&& val.contains(ENCODE_VALUE_FIELD)) {
try {
BsonDocument doc = BsonDocument.parse(val);
if (doc.containsKey(ENCODE_VALUE_FIELD)) {
return doc.get(ENCODE_VALUE_FIELD);
}
} catch (JsonParseException e) {
// invalid json format, fallback to store as a bson string.
return new BsonString(value.toString());
}
}
return new BsonString(value.toString());
}
};
case BYTES:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
return new BsonBinary((byte[]) value);
}
};
case DATE:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
LocalDate localDate = (LocalDate) value;
return new BsonDateTime(
localDate
.atStartOfDay(ZoneId.systemDefault())
.toInstant()
.toEpochMilli());
}
};
case TIMESTAMP:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
LocalDateTime localDateTime = (LocalDateTime) value;
return new BsonDateTime(
localDateTime
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli());
}
};
case DECIMAL:
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
DecimalType decimalType = (DecimalType) type;
BigDecimal decimalVal = (BigDecimal) value;
return new BsonDecimal128(
new Decimal128(
Objects.requireNonNull(
fromBigDecimal(
decimalVal,
decimalType.getPrecision(),
decimalType.getScale()))));
}
};
case ARRAY:
return createArrayConverter((ArrayType<?, ?>) type);
case MAP:
MapType<?, ?> mapType = (MapType<?, ?>) type;
return createMapConverter(
mapType.toString(), mapType.getKeyType(), mapType.getValueType());
case ROW:
return createRowConverter((SeaTunnelRowType) type);
default:
throw new MongodbConnectorException(
UNSUPPORTED_DATA_TYPE, "Not support to parse type: " + type);
}
}