public Object fromConnectData()

in java/avro-converter/src/main/java/com/microsoft/azure/schemaregistry/kafka/connect/avro/AvroConverterUtils.java [660:868]


    public Object fromConnectData(Schema schema, org.apache.avro.Schema avroSchema,
                                  Object logicalValue, boolean buildGenericRecord) {
        Schema.Type schemaType =
            schema != null ? schema.type() : getSchemaFromLogicalValue(logicalValue);

        if (schemaType == null) {
            if (buildGenericRecord) {
                return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
            }

            return null;
        }

        if (logicalValue == null && !schema.isOptional()) {
            throw new DataException("Found null value for non-optional schema");
        }

        if (logicalValue == null) {
            if (buildGenericRecord) {
                return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
            }

            return null;
        }

        Object bytesValue = logicalValue;

        // Logical Type Check
        if (schema != null && schema.name() != null) {
            switch (schema.name()) {
                case Decimal.LOGICAL_NAME:
                    if (!(bytesValue instanceof BigDecimal)) {
                        throw new DataException("Can't convert type for Decimal, expected BigDecimal but got "
                            + bytesValue.getClass());
                    }
                    bytesValue = Decimal.fromLogical(schema, (BigDecimal) logicalValue);
                    break;
                case Date.LOGICAL_NAME:
                    if (!(bytesValue instanceof java.util.Date)) {
                        throw new DataException(
                            "Can't convert type for Date, expected Date but got " + bytesValue.getClass());
                    }
                    bytesValue = Date.fromLogical(schema, (java.util.Date) logicalValue);
                    break;
                case Time.LOGICAL_NAME:
                    if (!(bytesValue instanceof java.util.Date)) {
                        throw new DataException(
                            "Can't convert type for Time, expected Date but got " + bytesValue.getClass());
                    }
                    bytesValue = Time.fromLogical(schema, (java.util.Date) logicalValue);
                    break;
                case Timestamp.LOGICAL_NAME:
                    if (!(bytesValue instanceof java.util.Date)) {
                        throw new DataException(
                            "Can't convert type for Timestamp, expected Date but got " + bytesValue.getClass());
                    }
                    bytesValue = Timestamp.fromLogical(schema, (java.util.Date) logicalValue);
                    break;
                default:
                    break;
            }
        }

        try {
            switch (schemaType) {
                case INT8: {
                    Byte byteValue = (Byte) bytesValue;
                    Integer convertedByteValue = byteValue == null ? null : byteValue.intValue();
                    return convertedByteValue;
                }
                case INT16: {
                    Short shortValue = (Short) bytesValue;
                    Integer convertedShortValue = shortValue == null ? null : shortValue.intValue();
                    return convertedShortValue;
                }
                case INT32:
                    Integer intValue = (Integer) bytesValue;
                    return intValue;
                case INT64:
                    Long longValue = (Long) bytesValue;
                    return longValue;
                case FLOAT32:
                    Float floatValue = (Float) bytesValue;
                    return floatValue;
                case FLOAT64:
                    Double doubleValue = (Double) bytesValue;
                    return doubleValue;
                case BOOLEAN:
                    Boolean boolValue = (Boolean) bytesValue;
                    return boolValue;
                case STRING:
                    if (schema != null && schema.parameters() != null
                        && schema.parameters().containsKey(AVRO_TYPE_ENUM)) {
                        String enumSchemaName = schema.parameters().get(AVRO_TYPE_ENUM);
                        return enumSymbol(avroSchema, bytesValue, enumSchemaName);
                    } else {
                        String stringValue = (String) bytesValue;
                        return stringValue;
                    }
                case BYTES: {
                    bytesValue = bytesValue instanceof byte[] ? ByteBuffer.wrap((byte[]) bytesValue)
                        : (ByteBuffer) bytesValue;

                    if (schema != null && isFixedSchema(schema)) {
                        int size = Integer.parseInt(schema.parameters().get(CONNECT_AVRO_FIXED_SIZE_PROP));
                        org.apache.avro.Schema fixedSchema = null;
                        if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
                            int index = 0;
                            for (org.apache.avro.Schema memberSchema : avroSchema.getTypes()) {
                                if (memberSchema.getType() == org.apache.avro.Schema.Type.FIXED
                                    && memberSchema.getFixedSize() == size
                                    && unionMemberFieldName(memberSchema, index)
                                    .equals(unionMemberFieldName(schema, index))) {
                                    fixedSchema = memberSchema;
                                }
                                index++;
                            }
                            if (fixedSchema == null) {
                                throw new DataException("Fixed size " + size + " not found in union " + avroSchema);
                            }
                        } else {
                            fixedSchema = avroSchema;
                        }
                        bytesValue = new GenericData.Fixed(fixedSchema, ((ByteBuffer) bytesValue).array());
                    }

                    return bytesValue;
                }
                case ARRAY: {
                    Collection<Object> list = (Collection<Object>) bytesValue;
                    List<Object> arrayValue = new ArrayList<>(list.size());
                    Schema arrayElementSchema = schema != null ? schema.valueSchema() : null;
                    org.apache.avro.Schema underlyingAvroSchema = getAvroSchema(schema, avroSchema);
                    org.apache.avro.Schema arrayElementAvroSchema =
                        schema != null ? underlyingAvroSchema.getElementType() : ANYTHING_SCHEMA;
                    for (Object val : list) {
                        arrayValue.add(fromConnectData(arrayElementSchema, arrayElementAvroSchema, val, false));
                    }

                    return arrayValue;
                }

                case MAP: {
                    Map<Object, Object> map = (Map<Object, Object>) bytesValue;
                    org.apache.avro.Schema underlyingAvroSchema;
                    if (schema != null && schema.keySchema().type() == Schema.Type.STRING
                        && (!schema.keySchema().isOptional())) {
                        underlyingAvroSchema = getAvroSchema(schema, avroSchema);
                        Map<String, Object> mapValue = new HashMap<>();
                        for (Map.Entry<Object, Object> entry : map.entrySet()) {
                            Object convertedValue = fromConnectData(schema.valueSchema(),
                                underlyingAvroSchema.getValueType(), entry.getValue(), false);
                            mapValue.put((String) entry.getKey(), convertedValue);
                        }
                        return mapValue;
                    } else {
                        List<GenericRecord> mapValue = new ArrayList<>(map.size());
                        underlyingAvroSchema = getAvroSchemaWithMapEntry(schema, avroSchema);
                        org.apache.avro.Schema elementSchema =
                            schema != null ? underlyingAvroSchema.getElementType()
                                : ANYTHING_SCHEMA_MAP_ELEMENT;
                        org.apache.avro.Schema avroKeySchema = elementSchema.getField(KEY_FIELD).schema();
                        org.apache.avro.Schema avroValueSchema = elementSchema.getField(VALUE_FIELD).schema();
                        for (Map.Entry<Object, Object> entry : map.entrySet()) {
                            Object keyConverted = fromConnectData(schema != null ? schema.keySchema() : null,
                                avroKeySchema, entry.getKey(), false);
                            Object valueConverted = fromConnectData(schema != null ? schema.valueSchema() : null,
                                avroValueSchema, entry.getValue(), false);
                            mapValue.add(new GenericRecordBuilder(elementSchema).set(KEY_FIELD, keyConverted)
                                .set(VALUE_FIELD, valueConverted).build());
                        }
                        return mapValue;
                    }
                }
                case STRUCT: {
                    Struct struct = (Struct) bytesValue;
                    if (!struct.schema().equals(schema)) {
                        throw new DataException("Mismatching struct schema");
                    }

                    if (AVRO_TYPE_UNION.equals(schema.name())) {
                        for (Field field : schema.fields()) {
                            Object object = struct.get(field);
                            if (object != null) {
                                return fromConnectData(field.schema(), avroSchema, object, false);
                            }
                        }
                        return fromConnectData(schema, avroSchema, null, true);
                    } else {
                        org.apache.avro.Schema underlyingAvroSchema = getAvroSchema(schema, avroSchema);
                        GenericRecordBuilder convertedBuilder = new GenericRecordBuilder(underlyingAvroSchema);
                        for (Field field : schema.fields()) {
                            String fieldName = field.name();
                            org.apache.avro.Schema.Field schemaField = underlyingAvroSchema.getField(fieldName);
                            org.apache.avro.Schema fieldAvroSchema = schemaField.schema();
                            Object fieldValue = struct.get(field);
                            convertedBuilder.set(fieldName,
                                fromConnectData(field.schema(), fieldAvroSchema, fieldValue, false));
                        }
                        return convertedBuilder.build();
                    }
                }
                default:
                    throw new DataException("Unknown schema type: " + schema.type());
            }
        } catch (ClassCastException e) {
            throw new DataException("Invalid type for " + schema.type() + ": " + bytesValue.getClass());
        }
    }