in kafka-connect/kafka-connect-transforms/src/main/java/org/debezium/connector/mongodb/transforms/MongoDataConverter.java [215:270]
private void convertFieldValue(
Schema valueSchema, BsonType valueType, BsonValue arrValue, List<Object> list) {
if (arrValue.getBsonType() == BsonType.STRING && valueType == BsonType.STRING) {
String temp = arrValue.asString().getValue();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.JAVASCRIPT && valueType == BsonType.JAVASCRIPT) {
String temp = arrValue.asJavaScript().getCode();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.OBJECT_ID && valueType == BsonType.OBJECT_ID) {
String temp = arrValue.asObjectId().getValue().toString();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.DOUBLE && valueType == BsonType.DOUBLE) {
double temp = arrValue.asDouble().getValue();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.BINARY && valueType == BsonType.BINARY) {
byte[] temp = arrValue.asBinary().getData();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.INT32 && valueType == BsonType.INT32) {
int temp = arrValue.asInt32().getValue();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.INT64 && valueType == BsonType.INT64) {
long temp = arrValue.asInt64().getValue();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.DATE_TIME && valueType == BsonType.DATE_TIME) {
Date temp = new Date(arrValue.asInt64().getValue());
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.DECIMAL128 && valueType == BsonType.DECIMAL128) {
String temp = arrValue.asDecimal128().getValue().toString();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.TIMESTAMP && valueType == BsonType.TIMESTAMP) {
Date temp = new Date(1000L * arrValue.asInt32().getValue());
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.BOOLEAN && valueType == BsonType.BOOLEAN) {
boolean temp = arrValue.asBoolean().getValue();
list.add(temp);
} else if (arrValue.getBsonType() == BsonType.DOCUMENT && valueType == BsonType.DOCUMENT) {
Struct struct1 = new Struct(valueSchema);
for (Entry<String, BsonValue> entry9 : arrValue.asDocument().entrySet()) {
convertFieldValue(entry9, struct1, valueSchema);
}
list.add(struct1);
} else if (arrValue.getBsonType() == BsonType.ARRAY && valueType == BsonType.ARRAY) {
List<Object> subList = Lists.newArrayList();
final Schema subValueSchema;
if (Arrays.asList(BsonType.ARRAY, BsonType.DOCUMENT)
.contains(arrValue.asArray().get(0).getBsonType())) {
subValueSchema = valueSchema.valueSchema();
} else {
subValueSchema = null;
}
for (BsonValue v : arrValue.asArray()) {
convertFieldValue(subValueSchema, v.getBsonType(), v, subList);
}
list.add(subList);
}
}