public void addFieldSchema()

in kafka-connect/kafka-connect-transforms/src/main/java/org/debezium/connector/mongodb/transforms/MongoDataConverter.java [276:389]


  public void addFieldSchema(Entry<String, BsonValue> keyValuesForSchema, SchemaBuilder builder) {
    String key = keyValuesForSchema.getKey();
    BsonType type = keyValuesForSchema.getValue().getBsonType();

    switch (type) {
      case NULL:
      case STRING:
      case JAVASCRIPT:
      case OBJECT_ID:
      case DECIMAL128:
        builder.field(key, Schema.OPTIONAL_STRING_SCHEMA);
        break;

      case DOUBLE:
        builder.field(key, Schema.OPTIONAL_FLOAT64_SCHEMA);
        break;

      case BINARY:
        builder.field(key, Schema.OPTIONAL_BYTES_SCHEMA);
        break;

      case INT32:
        builder.field(key, Schema.OPTIONAL_INT32_SCHEMA);
        break;

      case INT64:
        builder.field(key, Schema.OPTIONAL_INT64_SCHEMA);
        break;

      case DATE_TIME:
      case TIMESTAMP:
        builder.field(key, org.apache.kafka.connect.data.Timestamp.builder().optional().build());
        break;

      case BOOLEAN:
        builder.field(key, Schema.OPTIONAL_BOOLEAN_SCHEMA);
        break;

      case JAVASCRIPT_WITH_SCOPE:
        SchemaBuilder jsWithScope = SchemaBuilder.struct().name(builder.name() + "." + key);
        jsWithScope.field("code", Schema.OPTIONAL_STRING_SCHEMA);
        SchemaBuilder scope = SchemaBuilder.struct().name(jsWithScope.name() + ".scope").optional();
        BsonDocument jwsDocument =
            keyValuesForSchema.getValue().asJavaScriptWithScope().getScope().asDocument();

        for (Entry<String, BsonValue> jwsDocumentKey : jwsDocument.entrySet()) {
          addFieldSchema(jwsDocumentKey, scope);
        }

        Schema scopeBuild = scope.build();
        jsWithScope.field("scope", scopeBuild).build();
        builder.field(key, jsWithScope);
        break;

      case REGULAR_EXPRESSION:
        SchemaBuilder regexwop = SchemaBuilder.struct().name(SCHEMA_NAME_REGEX).optional();
        regexwop.field("regex", Schema.OPTIONAL_STRING_SCHEMA);
        regexwop.field("options", Schema.OPTIONAL_STRING_SCHEMA);
        builder.field(key, regexwop.build());
        break;

      case DOCUMENT:
        SchemaBuilder builderDoc =
            SchemaBuilder.struct().name(builder.name() + "." + key).optional();
        BsonDocument docs = keyValuesForSchema.getValue().asDocument();

        for (Entry<String, BsonValue> doc : docs.entrySet()) {
          addFieldSchema(doc, builderDoc);
        }
        builder.field(key, builderDoc.build());
        break;

      case ARRAY:
        if (keyValuesForSchema.getValue().asArray().isEmpty()) {
          switch (arrayEncoding) {
            case ARRAY:
              builder.field(
                  key, SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build());
              break;
            case DOCUMENT:
              builder.field(
                  key, SchemaBuilder.struct().name(builder.name() + "." + key).optional().build());
              break;
          }
        } else {
          switch (arrayEncoding) {
            case ARRAY:
              BsonArray value = keyValuesForSchema.getValue().asArray();
              BsonType valueType = value.get(0).getBsonType();
              testType(builder, key, keyValuesForSchema.getValue(), valueType);
              builder.field(
                  key,
                  SchemaBuilder.array(subSchema(builder, key, valueType, value))
                      .optional()
                      .build());
              break;
            case DOCUMENT:
              final BsonArray array = keyValuesForSchema.getValue().asArray();
              final SchemaBuilder arrayStructBuilder =
                  SchemaBuilder.struct().name(builder.name() + "." + key).optional();
              final Map<String, BsonValue> convertedArray = Maps.newHashMap();
              for (int i = 0; i < array.size(); i++) {
                convertedArray.put(arrayElementStructName(i), array.get(i));
              }
              convertedArray.entrySet().forEach(x -> addFieldSchema(x, arrayStructBuilder));
              builder.field(key, arrayStructBuilder.build());
              break;
          }
        }
        break;
      default:
        break;
    }
  }