in kafka-connect/kafka-connect-transforms/src/main/java/org/debezium/connector/mongodb/transforms/MongoDataConverter.java [62:211]
public void convertFieldValue(
Entry<String, BsonValue> keyValueForStruct, Struct struct, Schema schema) {
Object colValue = null;
String key = keyValueForStruct.getKey();
BsonType type = keyValueForStruct.getValue().getBsonType();
switch (type) {
case NULL:
colValue = null;
break;
case STRING:
colValue = keyValueForStruct.getValue().asString().getValue().toString();
break;
case OBJECT_ID:
colValue = keyValueForStruct.getValue().asObjectId().getValue().toString();
break;
case DOUBLE:
colValue = keyValueForStruct.getValue().asDouble().getValue();
break;
case BINARY:
colValue = keyValueForStruct.getValue().asBinary().getData();
break;
case INT32:
colValue = keyValueForStruct.getValue().asInt32().getValue();
break;
case INT64:
colValue = keyValueForStruct.getValue().asInt64().getValue();
break;
case BOOLEAN:
colValue = keyValueForStruct.getValue().asBoolean().getValue();
break;
case DATE_TIME:
colValue = new Date(keyValueForStruct.getValue().asDateTime().getValue());
break;
case JAVASCRIPT:
colValue = keyValueForStruct.getValue().asJavaScript().getCode();
break;
case JAVASCRIPT_WITH_SCOPE:
Struct jsStruct = new Struct(schema.field(key).schema());
Struct jsScopeStruct = new Struct(schema.field(key).schema().field("scope").schema());
jsStruct.put("code", keyValueForStruct.getValue().asJavaScriptWithScope().getCode());
BsonDocument jwsDoc =
keyValueForStruct.getValue().asJavaScriptWithScope().getScope().asDocument();
for (Entry<String, BsonValue> jwsDocKey : jwsDoc.entrySet()) {
convertFieldValue(jwsDocKey, jsScopeStruct, schema.field(key).schema());
}
jsStruct.put("scope", jsScopeStruct);
colValue = jsStruct;
break;
case REGULAR_EXPRESSION:
Struct regexStruct = new Struct(schema.field(key).schema());
regexStruct.put("regex", keyValueForStruct.getValue().asRegularExpression().getPattern());
regexStruct.put("options", keyValueForStruct.getValue().asRegularExpression().getOptions());
colValue = regexStruct;
break;
case TIMESTAMP:
colValue = new Date(1000L * keyValueForStruct.getValue().asTimestamp().getTime());
break;
case DECIMAL128:
colValue = keyValueForStruct.getValue().asDecimal128().getValue().toString();
break;
case DOCUMENT:
Field field = schema.field(key);
if (field == null) {
throw new DataException("Failed to find field '" + key + "' in schema " + schema.name());
}
Schema documentSchema = field.schema();
Struct documentStruct = new Struct(documentSchema);
BsonDocument docs = keyValueForStruct.getValue().asDocument();
for (Entry<String, BsonValue> doc : docs.entrySet()) {
convertFieldValue(doc, documentStruct, documentSchema);
}
colValue = documentStruct;
break;
case ARRAY:
if (keyValueForStruct.getValue().asArray().isEmpty()) {
switch (arrayEncoding) {
case ARRAY:
colValue = Lists.newArrayList();
break;
case DOCUMENT:
final Schema fieldSchema = schema.field(key).schema();
colValue = new Struct(fieldSchema);
break;
}
} else {
switch (arrayEncoding) {
case ARRAY:
BsonType valueType = keyValueForStruct.getValue().asArray().get(0).getBsonType();
List<BsonValue> arrValues = keyValueForStruct.getValue().asArray().getValues();
List<Object> list = Lists.newArrayList();
arrValues.forEach(
arrValue -> {
final Schema valueSchema;
if (Arrays.asList(BsonType.ARRAY, BsonType.DOCUMENT).contains(valueType)) {
valueSchema = schema.field(key).schema().valueSchema();
} else {
valueSchema = null;
}
convertFieldValue(valueSchema, valueType, arrValue, list);
});
colValue = list;
break;
case DOCUMENT:
final BsonArray array = keyValueForStruct.getValue().asArray();
final Map<String, BsonValue> convertedArray = Maps.newHashMap();
final Schema arraySchema = schema.field(key).schema();
final Struct arrayStruct = new Struct(arraySchema);
for (int i = 0; i < array.size(); i++) {
convertedArray.put(arrayElementStructName(i), array.get(i));
}
convertedArray
.entrySet()
.forEach(
x -> {
final Schema elementSchema = schema.field(key).schema();
convertFieldValue(x, arrayStruct, elementSchema);
});
colValue = arrayStruct;
break;
}
}
break;
default:
return;
}
struct.put(key, keyValueForStruct.getValue().isNull() ? null : colValue);
}