public void convertFieldValue()

in kafka-connect/kafka-connect-transforms/src/main/java/org/debezium/connector/mongodb/transforms/MongoDataConverter.java [62:211]


  public void convertFieldValue(
      Entry<String, BsonValue> keyValueForStruct, Struct struct, Schema schema) {
    Object colValue = null;

    String key = keyValueForStruct.getKey();
    BsonType type = keyValueForStruct.getValue().getBsonType();

    switch (type) {
      case NULL:
        colValue = null;
        break;

      case STRING:
        colValue = keyValueForStruct.getValue().asString().getValue().toString();
        break;

      case OBJECT_ID:
        colValue = keyValueForStruct.getValue().asObjectId().getValue().toString();
        break;

      case DOUBLE:
        colValue = keyValueForStruct.getValue().asDouble().getValue();
        break;

      case BINARY:
        colValue = keyValueForStruct.getValue().asBinary().getData();
        break;

      case INT32:
        colValue = keyValueForStruct.getValue().asInt32().getValue();
        break;

      case INT64:
        colValue = keyValueForStruct.getValue().asInt64().getValue();
        break;

      case BOOLEAN:
        colValue = keyValueForStruct.getValue().asBoolean().getValue();
        break;

      case DATE_TIME:
        colValue = new Date(keyValueForStruct.getValue().asDateTime().getValue());
        break;

      case JAVASCRIPT:
        colValue = keyValueForStruct.getValue().asJavaScript().getCode();
        break;

      case JAVASCRIPT_WITH_SCOPE:
        Struct jsStruct = new Struct(schema.field(key).schema());
        Struct jsScopeStruct = new Struct(schema.field(key).schema().field("scope").schema());
        jsStruct.put("code", keyValueForStruct.getValue().asJavaScriptWithScope().getCode());
        BsonDocument jwsDoc =
            keyValueForStruct.getValue().asJavaScriptWithScope().getScope().asDocument();

        for (Entry<String, BsonValue> jwsDocKey : jwsDoc.entrySet()) {
          convertFieldValue(jwsDocKey, jsScopeStruct, schema.field(key).schema());
        }

        jsStruct.put("scope", jsScopeStruct);
        colValue = jsStruct;
        break;

      case REGULAR_EXPRESSION:
        Struct regexStruct = new Struct(schema.field(key).schema());
        regexStruct.put("regex", keyValueForStruct.getValue().asRegularExpression().getPattern());
        regexStruct.put("options", keyValueForStruct.getValue().asRegularExpression().getOptions());
        colValue = regexStruct;
        break;

      case TIMESTAMP:
        colValue = new Date(1000L * keyValueForStruct.getValue().asTimestamp().getTime());
        break;

      case DECIMAL128:
        colValue = keyValueForStruct.getValue().asDecimal128().getValue().toString();
        break;

      case DOCUMENT:
        Field field = schema.field(key);
        if (field == null) {
          throw new DataException("Failed to find field '" + key + "' in schema " + schema.name());
        }
        Schema documentSchema = field.schema();
        Struct documentStruct = new Struct(documentSchema);
        BsonDocument docs = keyValueForStruct.getValue().asDocument();

        for (Entry<String, BsonValue> doc : docs.entrySet()) {
          convertFieldValue(doc, documentStruct, documentSchema);
        }

        colValue = documentStruct;
        break;

      case ARRAY:
        if (keyValueForStruct.getValue().asArray().isEmpty()) {
          switch (arrayEncoding) {
            case ARRAY:
              colValue = Lists.newArrayList();
              break;
            case DOCUMENT:
              final Schema fieldSchema = schema.field(key).schema();
              colValue = new Struct(fieldSchema);
              break;
          }
        } else {
          switch (arrayEncoding) {
            case ARRAY:
              BsonType valueType = keyValueForStruct.getValue().asArray().get(0).getBsonType();
              List<BsonValue> arrValues = keyValueForStruct.getValue().asArray().getValues();
              List<Object> list = Lists.newArrayList();

              arrValues.forEach(
                  arrValue -> {
                    final Schema valueSchema;
                    if (Arrays.asList(BsonType.ARRAY, BsonType.DOCUMENT).contains(valueType)) {
                      valueSchema = schema.field(key).schema().valueSchema();
                    } else {
                      valueSchema = null;
                    }
                    convertFieldValue(valueSchema, valueType, arrValue, list);
                  });
              colValue = list;
              break;
            case DOCUMENT:
              final BsonArray array = keyValueForStruct.getValue().asArray();
              final Map<String, BsonValue> convertedArray = Maps.newHashMap();
              final Schema arraySchema = schema.field(key).schema();
              final Struct arrayStruct = new Struct(arraySchema);
              for (int i = 0; i < array.size(); i++) {
                convertedArray.put(arrayElementStructName(i), array.get(i));
              }
              convertedArray
                  .entrySet()
                  .forEach(
                      x -> {
                        final Schema elementSchema = schema.field(key).schema();
                        convertFieldValue(x, arrayStruct, elementSchema);
                      });
              colValue = arrayStruct;
              break;
          }
        }
        break;

      default:
        return;
    }
    struct.put(key, keyValueForStruct.getValue().isNull() ? null : colValue);
  }