in kafka-connect/kafka-connect-transforms/src/main/java/org/debezium/connector/mongodb/transforms/MongoDataConverter.java [276:389]
public void addFieldSchema(Entry<String, BsonValue> keyValuesForSchema, SchemaBuilder builder) {
String key = keyValuesForSchema.getKey();
BsonType type = keyValuesForSchema.getValue().getBsonType();
switch (type) {
case NULL:
case STRING:
case JAVASCRIPT:
case OBJECT_ID:
case DECIMAL128:
builder.field(key, Schema.OPTIONAL_STRING_SCHEMA);
break;
case DOUBLE:
builder.field(key, Schema.OPTIONAL_FLOAT64_SCHEMA);
break;
case BINARY:
builder.field(key, Schema.OPTIONAL_BYTES_SCHEMA);
break;
case INT32:
builder.field(key, Schema.OPTIONAL_INT32_SCHEMA);
break;
case INT64:
builder.field(key, Schema.OPTIONAL_INT64_SCHEMA);
break;
case DATE_TIME:
case TIMESTAMP:
builder.field(key, org.apache.kafka.connect.data.Timestamp.builder().optional().build());
break;
case BOOLEAN:
builder.field(key, Schema.OPTIONAL_BOOLEAN_SCHEMA);
break;
case JAVASCRIPT_WITH_SCOPE:
SchemaBuilder jsWithScope = SchemaBuilder.struct().name(builder.name() + "." + key);
jsWithScope.field("code", Schema.OPTIONAL_STRING_SCHEMA);
SchemaBuilder scope = SchemaBuilder.struct().name(jsWithScope.name() + ".scope").optional();
BsonDocument jwsDocument =
keyValuesForSchema.getValue().asJavaScriptWithScope().getScope().asDocument();
for (Entry<String, BsonValue> jwsDocumentKey : jwsDocument.entrySet()) {
addFieldSchema(jwsDocumentKey, scope);
}
Schema scopeBuild = scope.build();
jsWithScope.field("scope", scopeBuild).build();
builder.field(key, jsWithScope);
break;
case REGULAR_EXPRESSION:
SchemaBuilder regexwop = SchemaBuilder.struct().name(SCHEMA_NAME_REGEX).optional();
regexwop.field("regex", Schema.OPTIONAL_STRING_SCHEMA);
regexwop.field("options", Schema.OPTIONAL_STRING_SCHEMA);
builder.field(key, regexwop.build());
break;
case DOCUMENT:
SchemaBuilder builderDoc =
SchemaBuilder.struct().name(builder.name() + "." + key).optional();
BsonDocument docs = keyValuesForSchema.getValue().asDocument();
for (Entry<String, BsonValue> doc : docs.entrySet()) {
addFieldSchema(doc, builderDoc);
}
builder.field(key, builderDoc.build());
break;
case ARRAY:
if (keyValuesForSchema.getValue().asArray().isEmpty()) {
switch (arrayEncoding) {
case ARRAY:
builder.field(
key, SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build());
break;
case DOCUMENT:
builder.field(
key, SchemaBuilder.struct().name(builder.name() + "." + key).optional().build());
break;
}
} else {
switch (arrayEncoding) {
case ARRAY:
BsonArray value = keyValuesForSchema.getValue().asArray();
BsonType valueType = value.get(0).getBsonType();
testType(builder, key, keyValuesForSchema.getValue(), valueType);
builder.field(
key,
SchemaBuilder.array(subSchema(builder, key, valueType, value))
.optional()
.build());
break;
case DOCUMENT:
final BsonArray array = keyValuesForSchema.getValue().asArray();
final SchemaBuilder arrayStructBuilder =
SchemaBuilder.struct().name(builder.name() + "." + key).optional();
final Map<String, BsonValue> convertedArray = Maps.newHashMap();
for (int i = 0; i < array.size(); i++) {
convertedArray.put(arrayElementStructName(i), array.get(i));
}
convertedArray.entrySet().forEach(x -> addFieldSchema(x, arrayStructBuilder));
builder.field(key, arrayStructBuilder.build());
break;
}
}
break;
default:
break;
}
}