private static Object fromConnectData()

in avro-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/avrodata/AvroData.java [362:595]


    private static Object fromConnectData(
            Schema schema, org.apache.avro.Schema avroSchema,
            Object logicalValue, boolean requireContainer,
            boolean requireSchemalessContainerNull, boolean enhancedSchemaSupport
    ) {
        Schema.Type schemaType = schema != null
                ? schema.type()
                : schemaTypeForSchemalessJavaType(logicalValue);
        if (schemaType == null) {
            // Schemaless null data since schema is null and we got a null schema type from the value
            if (requireSchemalessContainerNull) {
                return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
            } else {
                return null;
            }
        }

        validateSchemaValue(schema, logicalValue);

        if (logicalValue == null) {
            // But if this is schemaless, we may not be able to return null directly
            if (schema == null && requireSchemalessContainerNull) {
                return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
            } else {
                return null;
            }
        }

        // If this is a logical type, convert it from the convenient Java type to the underlying
        // serializeable format
        Object value = logicalValue;
        if (schema != null && schema.name() != null) {
            LogicalTypeConverter logicalConverter = TO_AVRO_LOGICAL_CONVERTERS.get(schema.name());
            if (logicalConverter != null) {
                value = logicalConverter.convert(schema, logicalValue);
            }
        }

        try {
            switch (schemaType) {
                case INT8: {
                    Byte byteValue = (Byte) value; // Check for correct type
                    Integer convertedByteValue = byteValue == null ? null : byteValue.intValue();
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, convertedByteValue, ANYTHING_SCHEMA_INT_FIELD),
                            requireContainer);
                }
                case INT16: {
                    Short shortValue = (Short) value; // Check for correct type
                    Integer convertedShortValue = shortValue == null ? null : shortValue.intValue();
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, convertedShortValue, ANYTHING_SCHEMA_INT_FIELD),
                            requireContainer);
                }

                case INT32:
                    Integer intValue = (Integer) value; // Check for correct type
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_INT_FIELD),
                            requireContainer);
                case INT64:
                    Long longValue = (Long) value; // Check for correct type
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_LONG_FIELD),
                            requireContainer);
                case FLOAT32:
                    Float floatValue = (Float) value; // Check for correct type
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_FLOAT_FIELD),
                            requireContainer);
                case FLOAT64:
                    Double doubleValue = (Double) value; // Check for correct type
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_DOUBLE_FIELD),
                            requireContainer);
                case BOOLEAN:
                    Boolean boolValue = (Boolean) value; // Check for correct type
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_BOOLEAN_FIELD),
                            requireContainer);
                case STRING:
                    if (enhancedSchemaSupport && schema != null && schema.parameters() != null
                            && schema.parameters().containsKey(AVRO_TYPE_ENUM)) {
                        String enumSchemaName = schema.parameters().get(AVRO_TYPE_ENUM);
                        org.apache.avro.Schema enumSchema;
                        if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
                            int enumIndex = avroSchema.getIndexNamed(enumSchemaName);
                            enumSchema = avroSchema.getTypes().get(enumIndex);
                        } else {
                            enumSchema = avroSchema;
                        }
                        value = new GenericData.EnumSymbol(enumSchema, (String) value);
                    } else {
                        String stringValue = (String) value; // Check for correct type
                    }
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_STRING_FIELD),
                            requireContainer);

                case BYTES: {
                    ByteBuffer bytesValue = value instanceof byte[] ? ByteBuffer.wrap((byte[]) value) :
                            (ByteBuffer) value;
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, bytesValue, ANYTHING_SCHEMA_BYTES_FIELD),
                            requireContainer);
                }

                case ARRAY: {
                    Collection<Object> list = (Collection<Object>) value;
                    List<Object> converted = new ArrayList<>(list.size());
                    Schema elementSchema = schema != null ? schema.valueSchema() : null;
                    org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(
                            schema, avroSchema);
                    org.apache.avro.Schema elementAvroSchema =
                            schema != null ? underlyingAvroSchema.getElementType() : ANYTHING_SCHEMA;
                    for (Object val : list) {
                        converted.add(
                                fromConnectData(
                                        elementSchema,
                                        elementAvroSchema,
                                        val,
                                        false,
                                        true,
                                        enhancedSchemaSupport
                                )
                        );
                    }
                    return maybeAddContainer(
                            avroSchema,
                            maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_ARRAY_FIELD),
                            requireContainer);
                }

                case MAP: {
                    Map<Object, Object> map = (Map<Object, Object>) value;
                    org.apache.avro.Schema underlyingAvroSchema;
                    if (schema != null && schema.keySchema().type() == Schema.Type.STRING
                            && !schema.keySchema().isOptional()) {

                        underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(schema, avroSchema);
                        Map<String, Object> converted = new HashMap<>();
                        for (Map.Entry<Object, Object> entry : map.entrySet()) {
                            // Key is a String, no conversion needed
                            Object convertedValue = fromConnectData(schema.valueSchema(),
                                    underlyingAvroSchema.getValueType(),
                                    entry.getValue(), false, true, enhancedSchemaSupport
                            );
                            converted.put((String) entry.getKey(), convertedValue);
                        }
                        return maybeAddContainer(avroSchema, converted, requireContainer);
                    } else {
                        List<GenericRecord> converted = new ArrayList<>(map.size());
                        underlyingAvroSchema = avroSchemaForUnderlyingMapEntryType(schema, avroSchema);
                        org.apache.avro.Schema elementSchema =
                                schema != null
                                        ? underlyingAvroSchema.getElementType()
                                        : ANYTHING_SCHEMA_MAP_ELEMENT;
                        org.apache.avro.Schema avroKeySchema = elementSchema.getField(KEY_FIELD).schema();
                        org.apache.avro.Schema avroValueSchema = elementSchema.getField(VALUE_FIELD).schema();
                        for (Map.Entry<Object, Object> entry : map.entrySet()) {
                            Object keyConverted = fromConnectData(schema != null ? schema.keySchema() : null,
                                    avroKeySchema, entry.getKey(), false, true,
                                    enhancedSchemaSupport);
                            Object valueConverted = fromConnectData(schema != null ? schema.valueSchema() : null,
                                    avroValueSchema, entry.getValue(), false,
                                    true, enhancedSchemaSupport);
                            converted.add(
                                    new GenericRecordBuilder(elementSchema)
                                            .set(KEY_FIELD, keyConverted)
                                            .set(VALUE_FIELD, valueConverted)
                                            .build()
                            );
                        }
                        return maybeAddContainer(
                                avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD),
                                requireContainer);
                    }
                }

                case STRUCT: {
                    Struct struct = (Struct) value;
                    if (!struct.schema().equals(schema)) {
                        throw new DataException("Mismatching struct schema");
                    }
                    //This handles the inverting of a union which is held as a struct, where each field is
                    // one of the union types.
                    if (AVRO_TYPE_UNION.equals(schema.name())) {
                        for (Field field : schema.fields()) {
                            Object object = struct.get(field);
                            if (object != null) {
                                return fromConnectData(
                                        field.schema(),
                                        avroSchema,
                                        object,
                                        false,
                                        true,
                                        enhancedSchemaSupport
                                );
                            }
                        }
                        return fromConnectData(schema, avroSchema, null, false, true, enhancedSchemaSupport);
                    } else {
                        org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(
                                schema, avroSchema);
                        GenericRecordBuilder convertedBuilder = new GenericRecordBuilder(underlyingAvroSchema);
                        for (Field field : schema.fields()) {
                            org.apache.avro.Schema.Field theField = underlyingAvroSchema.getField(field.name());
                            org.apache.avro.Schema fieldAvroSchema = theField.schema();
                            convertedBuilder.set(
                                    field.name(),
                                    fromConnectData(field.schema(), fieldAvroSchema, struct.get(field), false,
                                            true, enhancedSchemaSupport)
                            );
                        }
                        return convertedBuilder.build();
                    }
                }

                default:
                    throw new DataException("Unknown schema type: " + schema.type());
            }
        } catch (ClassCastException e) {
            throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass());
        }
    }