private static Object fromConnectData()

in schema-converter/avro-schema-converter/src/main/java/org/apache/rocketmq/schema/avro/AvroData.java [306:564]


    private static Object fromConnectData(
            Schema schema,
            org.apache.avro.Schema avroSchema,
            Object data,
            boolean requireContainer,
            boolean requireSchemalessContainerNull,
            boolean enhancedSchemaSupport
    ) {
        FieldType schemaType = schema != null
                ? schema.getFieldType()
                : schemaTypeForSchemalessJavaType(data);

        if (schemaType == null && requireSchemalessContainerNull) {
            if (requireSchemalessContainerNull) {
                return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
            } else {
                return null;
            }
        }

        if (data == null && schema != null && !schema.isOptional()) {
            throw new ConnectException("Found null value for non-optional schema");
        }

        if (data == null) {
            if (schema == null && requireSchemalessContainerNull) {
                return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
            } else {
                return null;
            }
        }

        Object value = data;
        if (schema != null && schema.getName() != null) {
            // Convert logical type data
            LogicalTypeConverter logicalConverter = TO_AVRO_LOGICAL_CONVERTERS.get(schema.getName());
            if (logicalConverter != null) {
                value = logicalConverter.convert(schema, value);
            }
        }

        try {
            switch (schemaType) {
                case INT8: {
                    Byte byteValue = (Byte) value;
                    Integer convertedByteValue = byteValue == null ? null : byteValue.intValue();
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, convertedByteValue, ANYTHING_SCHEMA_INT_FIELD),
                            requireContainer);
                }
                case INT16: {
                    Short shortValue = (Short) value;
                    Integer convertedShortValue = shortValue == null ? null : shortValue.intValue();
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, convertedShortValue, ANYTHING_SCHEMA_INT_FIELD),
                            requireContainer);
                }

                case INT32:
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_INT_FIELD),
                            requireContainer);
                case INT64:
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_LONG_FIELD),
                            requireContainer);
                case FLOAT32:
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_FLOAT_FIELD),
                            requireContainer);
                case FLOAT64:
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_DOUBLE_FIELD),
                            requireContainer);
                case BOOLEAN:
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_BOOLEAN_FIELD),
                            requireContainer);
                case STRING:
                    if (enhancedSchemaSupport && schema != null && schema.getParameters() != null && schema.getParameters().containsKey(AVRO_TYPE_ENUM)) {
                        String enumSchemaName = schema.getParameters().get(AVRO_TYPE_ENUM);
                        org.apache.avro.Schema enumSchema;
                        if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
                            int enumIndex = avroSchema.getIndexNamed(enumSchemaName);
                            enumSchema = avroSchema.getTypes().get(enumIndex);
                        } else {
                            enumSchema = avroSchema;
                        }
                        value = new GenericData.EnumSymbol(enumSchema, (String) value);
                    }

                    // string or enum
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_STRING_FIELD),
                            requireContainer);

                case BYTES: {
                    value = value instanceof byte[]
                            ? ByteBuffer.wrap((byte[]) value)
                            : (ByteBuffer) value;
                    if (schema != null && isFixedSchema(schema)) {
                        //Get fixed size
                        int size = Integer.parseInt(schema.getParameters().get(CONNECT_AVRO_FIXED_SIZE));
                        org.apache.avro.Schema fixedSchema = null;
                        if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
                            // union
                            for (org.apache.avro.Schema memberSchema : avroSchema.getTypes()) {
                                if (memberSchema.getType() == org.apache.avro.Schema.Type.FIXED
                                        && memberSchema.getFixedSize() == size
                                        && unionMemberFieldName(memberSchema, enhancedSchemaSupport)
                                        .equals(schema.getName())) {
                                    fixedSchema = memberSchema;
                                }
                            }

                            if (fixedSchema == null) {
                                throw new ConnectException("Fixed size " + size + " not in union " + avroSchema);
                            }
                        } else {
                            fixedSchema = avroSchema;
                        }
                        value = new GenericData.Fixed(fixedSchema, ((ByteBuffer) value).array());
                    }

                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_BYTES_FIELD),
                            requireContainer);
                }

                case ARRAY: {
                    Collection<Object> list = (Collection<Object>) value;
                    List<Object> converted = new ArrayList<>(list.size());
                    Schema elementSchema = schema != null ? schema.getValueSchema() : null;
                    org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(schema, avroSchema);
                    org.apache.avro.Schema elementAvroSchema = schema != null ? underlyingAvroSchema.getElementType() : ANYTHING_SCHEMA;
                    for (Object val : list) {
                        converted.add(
                                fromConnectData(
                                        elementSchema,
                                        elementAvroSchema,
                                        val,
                                        false,
                                        true,
                                        enhancedSchemaSupport
                                )
                        );
                    }
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_ARRAY_FIELD),
                            requireContainer);
                }

                case MAP: {
                    Map<Object, Object> map = (Map<Object, Object>) value;
                    org.apache.avro.Schema underlyingAvroSchema;
                    if (schema != null
                            && schema.getKeySchema().getFieldType() == FieldType.STRING
                            && !schema.getKeySchema().isOptional()) {

                        // TODO most types don't need a new converted object since types pass through
                        underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(schema, avroSchema);
                        Map<String, Object> converted = new HashMap<>();
                        for (Map.Entry<Object, Object> entry : map.entrySet()) {
                            // Key is a String, no conversion needed
                            Object convertedValue = fromConnectData(
                                    schema.getValueSchema(),
                                    underlyingAvroSchema.getValueType(),
                                    entry.getValue(),
                                    false,
                                    true,
                                    enhancedSchemaSupport
                            );
                            converted.put((String) entry.getKey(), convertedValue);
                        }
                        return maybeAddContainer(avroSchema, converted, requireContainer);
                    } else {
                        List<GenericRecord> converted = new ArrayList<>(map.size());
                        underlyingAvroSchema = avroSchemaForUnderlyingMapEntryType(schema, avroSchema);
                        org.apache.avro.Schema elementSchema =
                                schema != null
                                        ? underlyingAvroSchema.getElementType()
                                        : ANYTHING_SCHEMA_MAP_ELEMENT;
                        org.apache.avro.Schema avroKeySchema = elementSchema.getField(KEY_FIELD).schema();
                        org.apache.avro.Schema avroValueSchema = elementSchema.getField(VALUE_FIELD).schema();
                        for (Map.Entry<Object, Object> entry : map.entrySet()) {
                            Object keyConverted = fromConnectData(schema != null ? schema.getKeySchema() : null,
                                    avroKeySchema, entry.getKey(), false, true,
                                    enhancedSchemaSupport);
                            Object valueConverted = fromConnectData(schema != null ? schema.getValueSchema() : null,
                                    avroValueSchema, entry.getValue(), false,
                                    true, enhancedSchemaSupport);
                            converted.add(
                                    new GenericRecordBuilder(elementSchema)
                                            .set(KEY_FIELD, keyConverted)
                                            .set(VALUE_FIELD, valueConverted)
                                            .build()
                            );
                        }
                        return maybeAddContainer(
                                avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD),
                                requireContainer);
                    }
                }

                case STRUCT: {
                    Struct struct = (Struct) value;
                    if (!struct.schema().equals(schema)) {
                        throw new ConnectException("Mismatching struct schema");
                    }
                    //This handles the inverting of a union which is held as a struct, where each field is
                    // one of the union types.
                    if (AVRO_TYPE_UNION.equals(schema.getName())) {
                        for (Field field : schema.getFields()) {
                            Object object = struct.get(field);
                            if (object != null) {
                                return fromConnectData(
                                        field.getSchema(),
                                        avroSchema,
                                        object,
                                        false,
                                        true,
                                        enhancedSchemaSupport
                                );
                            }
                        }
                        return fromConnectData(schema, avroSchema, null, false, true, enhancedSchemaSupport);
                    } else {
                        org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(
                                schema, avroSchema);
                        GenericRecordBuilder convertedBuilder = new GenericRecordBuilder(underlyingAvroSchema);
                        for (Field field : schema.getFields()) {
                            org.apache.avro.Schema.Field theField = underlyingAvroSchema.getField(field.getName());
                            org.apache.avro.Schema fieldAvroSchema = theField.schema();
                            convertedBuilder.set(
                                    field.getName(),
                                    fromConnectData(field.getSchema(), fieldAvroSchema, struct.get(field), false,
                                            true, enhancedSchemaSupport)
                            );
                        }
                        return convertedBuilder.build();
                    }
                }
                default:
                    throw new ConnectException("Unknown schema type: " + schema.getFieldType());
            }
        } catch (ClassCastException e) {
            throw new ConnectException("Invalid type for " + schema.getFieldType() + ": " + value.getClass());
        }
    }