in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java [567:657]
private StringData convertToString(BsonValue docObj) {
if (docObj.isString()) {
return StringData.fromString(docObj.asString().getValue());
}
if (docObj.isDocument()) {
// convert document to json string
return StringData.fromString(docObj.asDocument().toJson());
}
if (docObj.isBinary()) {
BsonBinary bsonBinary = docObj.asBinary();
if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
return StringData.fromString(bsonBinary.asUuid().toString());
}
return StringData.fromString(HexUtils.toHex(bsonBinary.getData()));
}
if (docObj.isObjectId()) {
return StringData.fromString(docObj.asObjectId().getValue().toHexString());
}
if (docObj.isInt32()) {
return StringData.fromString(String.valueOf(docObj.asInt32().getValue()));
}
if (docObj.isInt64()) {
return StringData.fromString(String.valueOf(docObj.asInt64().getValue()));
}
if (docObj.isDouble()) {
return StringData.fromString(String.valueOf(docObj.asDouble().getValue()));
}
if (docObj.isDecimal128()) {
return StringData.fromString(docObj.asDecimal128().getValue().toString());
}
if (docObj.isBoolean()) {
return StringData.fromString(String.valueOf(docObj.asBoolean().getValue()));
}
if (docObj.isDateTime()) {
Instant instant = convertToInstant(docObj.asDateTime());
return StringData.fromString(
convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME));
}
if (docObj.isTimestamp()) {
Instant instant = convertToInstant(docObj.asTimestamp());
return StringData.fromString(
convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME));
}
if (docObj.isArray()) {
// convert bson array to json string
Writer writer = new StringWriter();
JsonWriter jsonArrayWriter =
new JsonWriter(writer) {
@Override
public void writeStartArray() {
doWriteStartArray();
setState(State.VALUE);
}
@Override
public void writeEndArray() {
doWriteEndArray();
setState(getNextState());
}
};
new BsonArrayCodec()
.encode(jsonArrayWriter, docObj.asArray(), EncoderContext.builder().build());
return StringData.fromString(writer.toString());
}
if (docObj.isRegularExpression()) {
BsonRegularExpression regex = docObj.asRegularExpression();
return StringData.fromString(
String.format("/%s/%s", regex.getPattern(), regex.getOptions()));
}
if (docObj.isJavaScript()) {
return StringData.fromString(docObj.asJavaScript().getCode());
}
if (docObj.isJavaScriptWithScope()) {
return StringData.fromString(docObj.asJavaScriptWithScope().getCode());
}
if (docObj.isSymbol()) {
return StringData.fromString(docObj.asSymbol().getSymbol());
}
if (docObj.isDBPointer()) {
return StringData.fromString(docObj.asDBPointer().getId().toHexString());
}
if (docObj instanceof BsonMinKey || docObj instanceof BsonMaxKey) {
return StringData.fromString(docObj.getBsonType().name());
}
throw new IllegalArgumentException(
"Unable to convert to string from unexpected value '"
+ docObj
+ "' of type "
+ docObj.getBsonType());
}