private Schema toConnectSchema()

in schema-converter/avro-schema-converter/src/main/java/org/apache/rocketmq/schema/avro/AvroData.java [1752:2044]


    private Schema toConnectSchema(org.apache.avro.Schema schema,
                                   boolean forceOptional,
                                   Object fieldDefaultVal,
                                   String docDefaultVal,
                                   Integer version,
                                   ToConnectContext toConnectContext) {

        String type = schema.getProp(CONNECT_TYPE);
        String logicalType = schema.getProp(AVRO_LOGICAL_TYPE_PROP);
        final SchemaBuilder builder;
        switch (schema.getType()) {
            case BOOLEAN:
                builder = SchemaBuilder.bool();
                break;
            case BYTES:
            case FIXED:
                if (AVRO_LOGICAL_DECIMAL.equalsIgnoreCase(logicalType)) {
                    Object scaleNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_SCALE_PROP);
                    if (null == scaleNode || !(scaleNode instanceof Number)) {
                        throw new ConnectException("scale must be specified and must be a number.");
                    }
                    Number scale = (Number) scaleNode;
                    builder = Decimal.builder(scale.intValue());

                    Object precisionNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP);
                    if (null != precisionNode) {
                        if (!(precisionNode instanceof Number)) {
                            throw new ConnectException(AVRO_LOGICAL_DECIMAL_PRECISION_PROP
                                    + " property must be a JSON Integer."
                                    + " https://avro.apache.org/docs/1.9.1/spec.html#Decimal");
                        }
                        // Capture the precision as a parameter only if it is not the default
                        Integer precision = ((Number) precisionNode).intValue();
                        if (precision != CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT) {
                            builder.parameter(CONNECT_AVRO_DECIMAL_PRECISION_PROP, precision.toString());
                        }
                    }
                } else {
                    builder = SchemaBuilder.bytes();
                }
                if (schema.getType() == org.apache.avro.Schema.Type.FIXED) {
                    builder.parameter(CONNECT_AVRO_FIXED_SIZE, String.valueOf(schema.getFixedSize()));
                }
                break;
            case DOUBLE:
                builder = SchemaBuilder.float64();
                break;
            case FLOAT:
                builder = SchemaBuilder.float32();
                break;
            case INT:
                // INT is used for Connect's INT8, INT16, and INT32
                if (type == null && logicalType == null) {
                    builder = SchemaBuilder.int32();
                } else if (logicalType != null) {
                    if (AVRO_LOGICAL_DATE.equalsIgnoreCase(logicalType)) {
                        builder = Date.builder();
                    } else if (AVRO_LOGICAL_TIME_MILLIS.equalsIgnoreCase(logicalType)) {
                        builder = Time.builder();
                    } else {
                        builder = SchemaBuilder.int32();
                    }
                } else {
                    FieldType connectType = NON_AVRO_TYPES_BY_TYPE_CODE.get(type);
                    if (connectType == null) {
                        throw new ConnectException("Connect type annotation for Avro int field is null");
                    }
                    builder = new SchemaBuilder(connectType);
                }
                break;
            case LONG:
                if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) {
                    builder = Timestamp.builder();
                } else {
                    builder = SchemaBuilder.int64();
                }
                break;
            case STRING:
                builder = SchemaBuilder.string();
                break;

            case ARRAY:
                org.apache.avro.Schema elemSchema = schema.getElementType();
                // Special case for custom encoding of non-string maps as list of key-value records
                if (isMapEntry(elemSchema)) {
                    if (elemSchema.getFields().size() != 2
                            || elemSchema.getField(KEY_FIELD) == null
                            || elemSchema.getField(VALUE_FIELD) == null) {
                        throw new ConnectException("Found map encoded as array of key-value pairs, but array "
                                + "elements do not match the expected format.");
                    }
                    builder = SchemaBuilder.map(
                            toConnectSchema(elemSchema.getField(KEY_FIELD).schema()),
                            toConnectSchema(elemSchema.getField(VALUE_FIELD).schema())
                    );
                } else {
                    Schema arraySchema = toConnectSchemaWithCycles(
                            schema.getElementType(), getForceOptionalDefault(),
                            null, null, toConnectContext);
                    builder = SchemaBuilder.array(arraySchema);
                }
                break;

            case MAP:
                builder = SchemaBuilder.map(
                        SchemaBuilder.string().build(),
                        toConnectSchemaWithCycles(
                                schema.getValueType(),
                                getForceOptionalDefault(),
                                null,
                                null,
                                toConnectContext
                        )
                );
                break;

            case RECORD: {
                builder = SchemaBuilder.struct();
                toConnectContext.cycleReferences.put(schema, new CyclicSchemaWrapper(builder.build()));
                if (connectMetaData && schema.getDoc() != null) {
                    builder.parameter(AVRO_RECORD_DOC, schema.getDoc());
                }
                for (org.apache.avro.Schema.Field field : schema.getFields()) {
                    if (connectMetaData && field.doc() != null) {
                        builder.parameter(AVRO_FIELD_DOC_PREFIX + field.name(), field.doc());
                    }
                    Schema fieldSchema = toConnectSchema(field.schema(), getForceOptionalDefault(),
                            field.defaultVal(), field.doc(), toConnectContext);
                    builder.field(field.name(), fieldSchema);
                }
                break;
            }

            case ENUM:
                // enums are unwrapped to strings and the original enum is not preserved
                builder = SchemaBuilder.string();
                if (connectMetaData) {
                    if (schema.getDoc() != null) {
                        builder.parameter(AVRO_ENUM_DOC_PREFIX + schema.getName(), schema.getDoc());
                    }
                    if (schema.getEnumDefault() != null) {
                        builder.parameter(AVRO_ENUM_DEFAULT_PREFIX + schema.getName(),
                                schema.getEnumDefault());
                    }
                }
                builder.parameter(AVRO_TYPE_ENUM, schema.getFullName());
                for (String enumSymbol : schema.getEnumSymbols()) {
                    builder.parameter(AVRO_TYPE_ENUM + "." + enumSymbol, enumSymbol);
                }
                break;

            case UNION: {
                if (schema.getTypes().size() == 2) {
                    if (schema.getTypes().contains(NULL_SCHEMA)) {
                        for (org.apache.avro.Schema memberSchema : schema.getTypes()) {
                            if (!memberSchema.equals(NULL_SCHEMA)) {
                                return toConnectSchemaWithCycles(
                                        memberSchema, true, null, docDefaultVal, toConnectContext);
                            }
                        }
                    }
                }
                builder = SchemaBuilder.struct().name(AVRO_TYPE_UNION);
                Set<String> fieldNames = new HashSet<>();
                for (org.apache.avro.Schema memberSchema : schema.getTypes()) {
                    if (memberSchema.getType() == org.apache.avro.Schema.Type.NULL) {
                        builder.optional();
                    } else {
                        String fieldName = unionMemberFieldName(memberSchema, enhancedSchemaSupport);
                        if (fieldNames.contains(fieldName)) {
                            throw new ConnectException("Multiple union schemas map to the Connect union field name");
                        }
                        fieldNames.add(fieldName);
                        builder.field(
                                fieldName,
                                toConnectSchemaWithCycles(memberSchema, true, null, null, toConnectContext)
                        );
                    }
                }
                break;
            }

            case NULL:
                throw new ConnectException("Standalone null schemas are not supported by this converter");

            default:
                throw new ConnectException("Couldn't translate unsupported schema type "
                        + schema.getType().getName() + ".");
        }

        String docVal = schema.getProp(CONNECT_DOC);
        if (connectMetaData && docVal != null) {
            builder.doc(docVal);
        }

        // A valid version must be a positive integer (assumed throughout SR)
        int versionInt = -1;
        Object versionNode = schema.getObjectProp(CONNECT_VERSION);
        if (versionNode != null) {
            if (!(versionNode instanceof Number)) {
                throw new ConnectException("Invalid Connect version found: " + versionNode);
            }
            versionInt = ((Number) versionNode).intValue();
        } else if (version != null) {
            versionInt = version.intValue();
        }
        if (versionInt >= 0) {
            if (builder.build().getVersion() != null) {
                if (versionInt != builder.build().getVersion()) {
                    throw new ConnectException("Mismatched versions: version already added to SchemaBuilder "
                            + "("
                            + builder.build().getVersion()
                            + ") differs from version in source schema ("
                            + versionInt
                            + ")");
                }
            } else {
                builder.version(versionInt);
            }
        }

        Object parameters = schema.getObjectProp(CONNECT_PARAMETERS);
        if (connectMetaData && parameters != null) {
            if (!(parameters instanceof Map)) {
                throw new ConnectException("Expected JSON object for schema parameters but found: "
                        + parameters);
            }
            Iterator<Map.Entry<String, Object>> paramIt =
                    ((Map<String, Object>) parameters).entrySet().iterator();
            while (paramIt.hasNext()) {
                Map.Entry<String, Object> field = paramIt.next();
                Object jsonValue = field.getValue();
                if (!(jsonValue instanceof String)) {
                    throw new ConnectException("Expected schema parameter values to be strings but found: "
                            + jsonValue);
                }
                builder.parameter(field.getKey(), (String) jsonValue);
            }
        }

        for (Map.Entry<String, Object> entry : schema.getObjectProps().entrySet()) {
            if (entry.getKey().startsWith(AVRO_PROP)) {
                builder.parameter(entry.getKey(), entry.getValue().toString());
            }
        }

        Object connectDefault = schema.getObjectProp(CONNECT_DEFAULT_VALUE);
        if (fieldDefaultVal == null) {
            fieldDefaultVal = JacksonUtils.toJsonNode(connectDefault);
        } else if (connectDefault == null) {
            builder.parameter(AVRO_FIELD_DEFAULT_FLAG, "true");
        }
        if (fieldDefaultVal != null) {
            builder.defaultValue(
                    defaultValueFromAvro(builder.build(), schema, fieldDefaultVal, toConnectContext));
        }

        Object connectNameJson = schema.getObjectProp(CONNECT_NAME);
        String name = null;
        if (connectNameJson != null) {
            if (!(connectNameJson instanceof String)) {
                throw new ConnectException("Invalid schema name: " + connectNameJson);
            }
            name = (String) connectNameJson;

        } else if (schema.getType() == org.apache.avro.Schema.Type.RECORD
                || schema.getType() == org.apache.avro.Schema.Type.ENUM
                || schema.getType() == org.apache.avro.Schema.Type.FIXED) {
            name = schema.getFullName();
        }
        if (name != null && !name.equals(DEFAULT_SCHEMA_FULL_NAME)) {
            if (builder.build().getName() != null) {
                if (!name.equals(builder.build().getName())) {
                    throw new ConnectException("Mismatched names: name already added to SchemaBuilder ("
                            + builder.build().getName()
                            + ") differs from name in source schema ("
                            + name + ")");
                }
            } else {
                builder.name(name);
            }
        }

        if (forceOptional) {
            builder.optional();
        }

        if (!toConnectContext.detectedCycles.contains(schema)) {
            toConnectContext.cycleReferences.remove(schema);
        }

        return builder.build();
    }