private Object toConnectValue()

in java/avro-converter/src/main/java/com/microsoft/azure/schemaregistry/kafka/connect/avro/AvroConverterUtils.java [174:457]


    private Object toConnectValue(Schema schema, Object value) {
        if (value == null || value == JsonProperties.NULL_VALUE) {
            return null;
        }

        try {
            if (schema == null) {
                if (!(value instanceof IndexedRecord)) {
                    throw new DataException("Invalid Avro data for schemaless Connect data");
                }
                IndexedRecord recordValue = (IndexedRecord) value;

                Object boolVal =
                    recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BOOLEAN_FIELD).pos());
                if (boolVal != null) {
                    return toConnectValue(Schema.BOOLEAN_SCHEMA, boolVal);
                }

                Object bytesVal =
                    recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BYTES_FIELD).pos());
                if (bytesVal != null) {
                    return toConnectValue(Schema.BYTES_SCHEMA, bytesVal);
                }

                Object doubleVal =
                    recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_DOUBLE_FIELD).pos());
                if (doubleVal != null) {
                    return toConnectValue(Schema.FLOAT64_SCHEMA, doubleVal);
                }

                Object floatVal =
                    recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_FLOAT_FIELD).pos());
                if (floatVal != null) {
                    return toConnectValue(Schema.FLOAT32_SCHEMA, floatVal);
                }

                Object intVal = recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_INT_FIELD).pos());
                if (intVal != null) {
                    return toConnectValue(Schema.INT32_SCHEMA, intVal);
                }

                Object longVal =
                    recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_LONG_FIELD).pos());
                if (longVal != null) {
                    return toConnectValue(Schema.INT64_SCHEMA, longVal);
                }

                Object stringVal =
                    recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_STRING_FIELD).pos());
                if (stringVal != null) {
                    return toConnectValue(Schema.STRING_SCHEMA, stringVal);
                }

                Object arrayVal =
                    recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_ARRAY_FIELD).pos());
                if (arrayVal != null) {
                    if (!(arrayVal instanceof Collection)) {
                        throw new DataException("Expected Collection for schemaless array field but found "
                            + arrayVal.getClass().getName());
                    }
                    Collection<Object> arrayValCollection = (Collection<Object>) arrayVal;
                    List<Object> result = new ArrayList<>(arrayValCollection.size());
                    for (Object arrayValue : arrayValCollection) {
                        result.add(toConnectValue((Schema) null, arrayValue));
                    }

                    return result;
                }

                Object mapVal = recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_MAP_FIELD).pos());
                if (mapVal != null) {
                    if (!(mapVal instanceof Collection)) {
                        throw new DataException(
                            "Expected List for schemaless map field but found " + mapVal.getClass().getName());
                    }
                    Collection<IndexedRecord> mapValueCollection = (Collection<IndexedRecord>) mapVal;
                    Map<Object, Object> result = new HashMap<>(mapValueCollection.size());
                    for (IndexedRecord mapValue : mapValueCollection) {
                        int avroKeyFieldIndex = mapValue.getSchema().getField(KEY_FIELD).pos();
                        int avroValueFieldIndex = mapValue.getSchema().getField(VALUE_FIELD).pos();
                        Object convertedKey = toConnectValue((Schema) null, mapValue.get(avroKeyFieldIndex));
                        Object convertedValue =
                            toConnectValue((Schema) null, mapValue.get(avroValueFieldIndex));
                        result.put(convertedKey, convertedValue);
                    }

                    return result;
                }

                return null;
            }

            Object converted = null;
            switch (schema.type()) {
                case INT32: {
                    Integer intValue = (Integer) value;
                    converted = intValue;
                    break;
                }
                case INT64: {
                    Long longValue = (Long) value;
                    converted = longValue;
                    break;
                }
                case FLOAT32: {
                    Float floatValue = (Float) value;
                    converted = floatValue;
                    break;
                }
                case FLOAT64: {
                    Double doubleValue = (Double) value;
                    converted = doubleValue;
                    break;
                }
                case BOOLEAN: {
                    Boolean boolValue = (Boolean) value;
                    converted = boolValue;
                    break;
                }
                case INT8:
                    converted = ((Integer) value).byteValue();
                    break;
                case INT16:
                    converted = ((Integer) value).shortValue();
                    break;
                case STRING:
                    if (value instanceof String) {
                        converted = value;
                    } else if (value instanceof CharSequence || value instanceof Enum) {
                        converted = value.toString();
                    } else {
                        throw new DataException(
                            "Invalid class for string type, expecting String or CharSequence, got "
                                + value.getClass());
                    }
                    break;
                case BYTES:
                    if (value instanceof byte[]) {
                        converted = ByteBuffer.wrap((byte[]) value);
                    } else if (value instanceof ByteBuffer) {
                        converted = value;
                    } else if (value instanceof GenericFixed) {
                        converted = ByteBuffer.wrap(((GenericFixed) value).bytes());
                    } else {
                        throw new DataException(
                            "Invalid class for bytes type, expecting byte[] or ByteBuffer, got "
                                + value.getClass());
                    }
                    break;
                case ARRAY: {
                    Schema valueSchema = schema.valueSchema();
                    Collection<Object> valueSchemaCollection = (Collection<Object>) value;
                    List<Object> arrayValue = new ArrayList<>(valueSchemaCollection.size());
                    for (Object elem : valueSchemaCollection) {
                        arrayValue.add(toConnectValue(valueSchema, elem));
                    }
                    converted = arrayValue;
                    break;
                }
                case MAP: {
                    Schema keySchema = schema.keySchema();
                    Schema valueSchema = schema.valueSchema();
                    if (keySchema != null && keySchema.type() == Schema.Type.STRING
                        && !keySchema.isOptional()) {
                        Map<CharSequence, Object> valueSchemaMap = (Map<CharSequence, Object>) value;
                        Map<CharSequence, Object> mapValue = new HashMap<>(valueSchemaMap.size());
                        for (Map.Entry<CharSequence, Object> entry : valueSchemaMap.entrySet()) {
                            mapValue.put(entry.getKey().toString(),
                                toConnectValue(valueSchema, entry.getValue()));
                        }
                        converted = mapValue;
                    } else {
                        Collection<IndexedRecord> original = (Collection<IndexedRecord>) value;
                        Map<Object, Object> mapValue = new HashMap<>(original.size());
                        for (IndexedRecord entry : original) {
                            int avroKeyFieldIndex = entry.getSchema().getField(KEY_FIELD).pos();
                            int avroValueFieldIndex = entry.getSchema().getField(VALUE_FIELD).pos();
                            Object convertedKey = toConnectValue(keySchema, entry.get(avroKeyFieldIndex));
                            Object convertedValue = toConnectValue(valueSchema, entry.get(avroValueFieldIndex));
                            mapValue.put(convertedKey, convertedValue);
                        }
                        converted = mapValue;
                    }
                    break;
                }
                case STRUCT: {
                    if (AVRO_TYPE_UNION.equals(schema.name())) {
                        Schema valueRecordSchema = null;
                        if (value instanceof IndexedRecord) {
                            IndexedRecord valueRecord = ((IndexedRecord) value);
                            valueRecordSchema = toConnectSchema(valueRecord.getSchema());
                        }
                        int index = 0;
                        for (Field field : schema.fields()) {
                            Schema fieldSchema = field.schema();
                            if (isInstanceOfAvroSchemaTypeForSimpleSchema(fieldSchema, value, index)
                                || (valueRecordSchema != null && schemaEquals(valueRecordSchema, fieldSchema))) {
                                converted = new Struct(schema).put(unionMemberFieldName(fieldSchema, index),
                                    toConnectValue(fieldSchema, value));
                                break;
                            }
                            index++;
                        }
                        if (converted == null) {
                            throw new DataException("Did not find matching union field for data");
                        }
                    } else if (value instanceof Map) {
                        Map<CharSequence, Object> original = (Map<CharSequence, Object>) value;
                        Struct result = new Struct(schema);
                        for (Field field : schema.fields()) {
                            String fieldName = field.name();
                            Object convertedFieldValue = toConnectValue(field.schema(),
                                original.getOrDefault(fieldName, field.schema().defaultValue()));
                            result.put(field, convertedFieldValue);
                        }
                        return result;
                    } else {
                        IndexedRecord valueRecord = (IndexedRecord) value;
                        Struct structValue = new Struct(schema);
                        for (Field field : schema.fields()) {
                            String fieldName = field.name();
                            int avroFieldIndex = valueRecord.getSchema().getField(fieldName).pos();
                            Object convertedFieldValue =
                                toConnectValue(field.schema(), valueRecord.get(avroFieldIndex));
                            structValue.put(field, convertedFieldValue);
                        }
                        converted = structValue;
                    }
                    break;
                }
                default:
                    throw new DataException("Unknown Connect schema type: " + schema.type());
            }

            if (schema.name() != null) {
                String schemaNameLower = schema.name().toLowerCase();

                switch (schemaNameLower) {
                    case "org.apache.kafka.connect.data.date":
                        if (!(value instanceof Integer)) {
                            throw new DataException(
                                "Invalid type for Time, underlying representation should be int32 but was "
                                    + value.getClass());
                        }
                        converted = Date.toLogical(schema, (int) value);
                        break;
                    case "org.apache.kafka.connect.data.time":
                        if (!(value instanceof Integer)) {
                            throw new DataException(
                                "Invalid type for Time, underlying representation should be int32 but was "
                                    + value.getClass());
                        }
                        converted = Time.toLogical(schema, (int) value);
                        break;
                    case "org.apache.kafka.connect.data.timestamp":
                        if (!(value instanceof Long)) {
                            throw new DataException(
                                "Invalid type for Timestamp, underlying representation should be int64 but was "
                                    + value.getClass());
                        }
                        converted = Timestamp.toLogical(schema, (long) value);
                        break;
                    case "org.apache.kafka.connect.data.decimal":
                        if (value instanceof byte[]) {
                            converted = Decimal.toLogical(schema, (byte[]) value);
                        } else if (value instanceof ByteBuffer) {
                            converted = Decimal.toLogical(schema, ((ByteBuffer) value).array());
                        } else {
                            throw new DataException(
                                "Invalid type for Decimal, underlying representation should be bytes but was "
                                    + value.getClass());
                        }
                        break;
                    default:
                        break;
                }
            }

            return converted;
        } catch (ClassCastException e) {
            String schemaType = schema != null ? schema.type().toString() : "null";
            throw new DataException("Invalid type for " + schemaType + ": " + value.getClass());
        }
    }