public Schema toConnectSchema()

in java/avro-converter/src/main/java/com/microsoft/azure/schemaregistry/kafka/connect/avro/AvroConverterUtils.java [464:650]


    public Schema toConnectSchema(org.apache.avro.Schema schema) {
        String type = schema.getProp(CONNECT_TYPE_PROP);
        String logicalType = schema.getProp(AVRO_LOGICAL_TYPE_PROP);

        final SchemaBuilder builder;

        switch (schema.getType()) {
            case BOOLEAN:
                builder = SchemaBuilder.bool();
                break;
            case BYTES:
            case FIXED:
                if (AVRO_LOGICAL_DECIMAL.equalsIgnoreCase(logicalType)) {
                    Object scaleNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_SCALE_PROP);
                    int scale = scaleNode instanceof Number ? ((Number) scaleNode).intValue() : 0;
                    builder = Decimal.builder(scale);

                    Object precisionNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP);
                    if (null != precisionNode) {
                        if (!(precisionNode instanceof Number)) {
                            throw new DataException(AVRO_LOGICAL_DECIMAL_PRECISION_PROP + " is not an integer.");
                        }
                        int percision = ((Number) precisionNode).intValue();
                        if (percision != CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT) {
                            builder.parameter(CONNECT_AVRO_DECIMAL_PRECISION_PROP, String.valueOf(percision));
                        }
                    }
                } else {
                    builder = SchemaBuilder.bytes();
                }
                if (schema.getType() == org.apache.avro.Schema.Type.FIXED) {
                    builder.parameter(CONNECT_AVRO_FIXED_SIZE_PROP, String.valueOf(schema.getFixedSize()));
                }
                break;
            case DOUBLE:
                builder = SchemaBuilder.float64();
                break;
            case FLOAT:
                builder = SchemaBuilder.float32();
                break;
            case INT:
                if (type == null && logicalType == null) {
                    builder = SchemaBuilder.int32();
                } else if (logicalType != null) {
                    if (AVRO_LOGICAL_DATE.equalsIgnoreCase(logicalType)) {
                        builder = Date.builder();
                    } else if (AVRO_LOGICAL_TIME_MILLIS.equalsIgnoreCase(logicalType)) {
                        builder = Time.builder();
                    } else {
                        builder = SchemaBuilder.int32();
                    }
                } else {
                    Schema.Type connectType = NON_AVRO_TYPES_BY_TYPE_CODE.get(type);
                    if (connectType == null) {
                        throw new DataException("Connect type for Avro int field is not found");
                    }
                    builder = SchemaBuilder.type(connectType);
                }
                break;
            case LONG:
                if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
                    builder = Timestamp.builder();
                } else {
                    builder = SchemaBuilder.int64();
                }
                break;
            case STRING:
                builder = SchemaBuilder.string();
                break;
            case ARRAY:
                org.apache.avro.Schema arrayElementSchema = schema.getElementType();
                if (isMapEntry(arrayElementSchema)) {
                    if (arrayElementSchema.getFields().size() != 2
                        || arrayElementSchema.getField(KEY_FIELD) == null
                        || arrayElementSchema.getField(VALUE_FIELD) == null) {
                        throw new DataException("Expected array of key-value pairs");
                    }
                    builder =
                        SchemaBuilder.map(toConnectSchema(arrayElementSchema.getField(KEY_FIELD).schema()),
                            toConnectSchema(arrayElementSchema.getField(VALUE_FIELD).schema()));
                } else {
                    Schema arraySchema = toConnectSchema(schema.getElementType());
                    builder = SchemaBuilder.array(arraySchema);
                }
                break;
            case MAP:
                builder = SchemaBuilder.map(Schema.STRING_SCHEMA, toConnectSchema(schema.getValueType()));
                break;
            case RECORD: {
                builder = SchemaBuilder.struct();
                for (org.apache.avro.Schema.Field field : schema.getFields()) {
                    Schema fieldSchema = toConnectSchema(field.schema());
                    builder.field(field.name(), fieldSchema);
                }
                break;
            }
            case ENUM:
                builder = SchemaBuilder.string();
                String paramName = AVRO_TYPE_ENUM;
                builder.parameter(paramName, schema.getFullName());
                for (String enumSymbol : schema.getEnumSymbols()) {
                    builder.parameter(paramName + "." + enumSymbol, enumSymbol);
                }
                break;
            case UNION: {
                if (schema.getTypes().size() == 2) {
                    if (schema.getTypes().contains(NULL_AVRO_SCHEMA)) {
                        for (org.apache.avro.Schema memberSchema : schema.getTypes()) {
                            if (!memberSchema.equals(NULL_AVRO_SCHEMA)) {
                                return toConnectSchema(memberSchema);
                            }
                        }
                    }
                }
                String unionName = AVRO_TYPE_UNION;
                builder = SchemaBuilder.struct().name(unionName);
                Set<String> fieldNames = new HashSet<>();
                int fieldIndex = 0;
                for (org.apache.avro.Schema memberSchema : schema.getTypes()) {
                    if (memberSchema.getType() == org.apache.avro.Schema.Type.NULL) {
                        builder.optional();
                    } else {
                        String fieldName = unionMemberFieldName(memberSchema, fieldIndex);
                        if (fieldNames.contains(fieldName)) {
                            throw new DataException("Multiple union schemas map to the Connect union field name");
                        }
                        fieldNames.add(fieldName);
                        builder.field(fieldName, toConnectSchema(memberSchema));
                    }
                    fieldIndex++;
                }
                break;
            }
            case NULL:
                throw new DataException("Null schemas are not supported");

            default:
                throw new DataException("Unsupported schema type: " + schema.getType().getName());
        }

        Object parameters = schema.getObjectProp(CONNECT_PARAMETERS_PROP);
        if (parameters != null) {
            if (!(parameters instanceof Map)) {
                throw new DataException("Expected JSON object for schema parameters. Got: " + parameters);
            }
            Map<String, Object> params = (Map<String, Object>) parameters;
            for (Map.Entry<String, Object> entry : params.entrySet()) {
                Object jsonValue = entry.getValue();
                if (!(jsonValue instanceof String)) {
                    throw new DataException("Schema parameter value is not a string. Got: " + jsonValue);
                }
                builder.parameter(entry.getKey(), (String) jsonValue);
            }
        }

        for (Map.Entry<String, Object> entry : schema.getObjectProps().entrySet()) {
            if (entry.getKey().startsWith(AVRO_PROP)) {
                builder.parameter(entry.getKey(), entry.getValue().toString());
            }
        }

        Object connectNameJson = schema.getObjectProp(CONNECT_NAME_PROP);
        String name = null;
        if (connectNameJson != null) {
            if (!(connectNameJson instanceof String)) {
                throw new DataException("Invalid schema name: " + connectNameJson.toString());
            }
            name = (String) connectNameJson;

        } else if (schema.getType() == org.apache.avro.Schema.Type.RECORD
            || schema.getType() == org.apache.avro.Schema.Type.ENUM
            || schema.getType() == org.apache.avro.Schema.Type.FIXED) {
            name = schema.getFullName();
        }
        if (name != null && !name.startsWith(DEFAULT_SCHEMA_FULL_NAME)) {
            if (builder.name() != null) {
                if (!name.equals(builder.name())) {
                    throw new DataException("Schema name that has already been added to SchemaBuilder ("
                        + builder.name() + ") differs from name in source schema (" + name + ")");
                }
            } else {
                builder.name(name);
            }
        }

        return builder.build();
    }