private Object toConnectData()

in schema-converter/avro-schema-converter/src/main/java/org/apache/rocketmq/schema/avro/AvroData.java [1421:1703]


    private Object toConnectData(Schema schema,
                                 Object value,
                                 ToConnectContext toConnectContext,
                                 boolean doLogicalConversion) {
        if (value == null && schema != null && !schema.isOptional()) {
            throw new ConnectException("Found null value for non-optional schema");
        }

        if (value == null) {
            return null;
        }
        try {
            // If we're decoding schemaless data, we need to unwrap it into just the single value
            if (schema == null) {
                if (!(value instanceof IndexedRecord)) {
                    throw new ConnectException("Invalid Avro data for schemaless Connect data");
                }
                IndexedRecord recordValue = (IndexedRecord) value;

                Object
                        boolVal =
                        recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BOOLEAN_FIELD).pos());
                if (boolVal != null) {
                    return toConnectData(SchemaBuilder.bool().build(), boolVal, toConnectContext);
                }

                Object
                        bytesVal =
                        recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_BYTES_FIELD).pos());
                if (bytesVal != null) {
                    return toConnectData(SchemaBuilder.bytes().build(), bytesVal, toConnectContext);
                }

                Object
                        dblVal =
                        recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_DOUBLE_FIELD).pos());
                if (dblVal != null) {
                    return toConnectData(SchemaBuilder.float64().build(), dblVal, toConnectContext);
                }

                Object
                        fltVal =
                        recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_FLOAT_FIELD).pos());
                if (fltVal != null) {
                    return toConnectData(SchemaBuilder.float32().build(), fltVal, toConnectContext);
                }

                Object intVal = recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_INT_FIELD).pos());
                if (intVal != null) {
                    return toConnectData(SchemaBuilder.int32().build(), intVal, toConnectContext);
                }

                Object
                        longVal =
                        recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_LONG_FIELD).pos());
                if (longVal != null) {
                    return toConnectData(SchemaBuilder.int64().build(), longVal, toConnectContext);
                }

                Object
                        stringVal =
                        recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_STRING_FIELD).pos());
                if (stringVal != null) {
                    return toConnectData(SchemaBuilder.string().build(), stringVal, toConnectContext);
                }

                Object
                        arrayVal =
                        recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_ARRAY_FIELD).pos());
                if (arrayVal != null) {
                    // We cannot reuse the logic like we do in other cases because it is not possible to
                    // construct an array schema with a null item schema, but the items have no schema.
                    if (!(arrayVal instanceof Collection)) {
                        throw new ConnectException(
                                "Expected a Collection for schemaless array field but found a "
                                        + arrayVal.getClass().getName()
                        );
                    }
                    Collection<Object> original = (Collection<Object>) arrayVal;
                    List<Object> result = new ArrayList<>(original.size());
                    for (Object elem : original) {
                        result.add(toConnectData(null, elem, toConnectContext));
                    }
                    return result;
                }

                Object mapVal = recordValue.get(ANYTHING_SCHEMA.getField(ANYTHING_SCHEMA_MAP_FIELD).pos());
                if (mapVal != null) {
                    // We cannot reuse the logic like we do in other cases because it is not possible to
                    // construct a map schema with a null item schema, but the items have no schema.
                    if (!(mapVal instanceof Collection)) {
                        throw new ConnectException(
                                "Expected a List for schemaless map field but found a "
                                        + mapVal.getClass().getName()
                        );
                    }
                    Collection<IndexedRecord> original = (Collection<IndexedRecord>) mapVal;
                    Map<Object, Object> result = new HashMap<>(original.size());
                    for (IndexedRecord entry : original) {
                        int avroKeyFieldIndex = entry.getSchema().getField(KEY_FIELD).pos();
                        int avroValueFieldIndex = entry.getSchema().getField(VALUE_FIELD).pos();
                        Object convertedKey = toConnectData(
                                null, entry.get(avroKeyFieldIndex), toConnectContext);
                        Object convertedValue = toConnectData(
                                null, entry.get(avroValueFieldIndex), toConnectContext);
                        result.put(convertedKey, convertedValue);
                    }
                    return result;
                }

                // If nothing was set, it's null
                return null;
            }

            Object converted = null;
            switch (schema.getFieldType()) {
                // Pass through types
                case INT32: {
                    Integer intValue = (Integer) value; // Validate type
                    converted = value;
                    break;
                }
                case INT64: {
                    Long longValue = (Long) value; // Validate type
                    converted = value;
                    break;
                }
                case FLOAT32: {
                    Float floatValue = (Float) value; // Validate type
                    converted = value;
                    break;
                }
                case FLOAT64: {
                    Double doubleValue = (Double) value; // Validate type
                    converted = value;
                    break;
                }
                case BOOLEAN: {
                    Boolean boolValue = (Boolean) value; // Validate type
                    converted = value;
                    break;
                }

                case INT8:
                    // Encoded as an Integer
                    converted = ((Integer) value).byteValue();
                    break;
                case INT16:
                    // Encoded as an Integer
                    converted = ((Integer) value).shortValue();
                    break;

                case STRING:
                    if (value instanceof String) {
                        converted = value;
                    } else if (value instanceof CharSequence
                            || value instanceof GenericEnumSymbol
                            || value instanceof Enum) {
                        converted = value.toString();
                    } else {
                        throw new ConnectException("Invalid class for string type, expecting String or "
                                + "CharSequence but found " + value.getClass());
                    }
                    break;

                case BYTES:
                    if (value instanceof byte[]) {
                        converted = ByteBuffer.wrap((byte[]) value);
                    } else if (value instanceof ByteBuffer) {
                        converted = value;
                    } else if (value instanceof GenericFixed) {
                        converted = ByteBuffer.wrap(((GenericFixed) value).bytes());
                    } else {
                        throw new ConnectException("Invalid class for bytes type, expecting byte[] or ByteBuffer "
                                + "but found " + value.getClass());
                    }
                    break;

                case ARRAY: {
                    Schema valueSchema = schema.getValueSchema();
                    Collection<Object> original = (Collection<Object>) value;
                    List<Object> result = new ArrayList<>(original.size());
                    for (Object elem : original) {
                        result.add(toConnectData(valueSchema, elem, toConnectContext));
                    }
                    converted = result;
                    break;
                }

                case MAP: {
                    Schema keySchema = schema.getKeySchema();
                    Schema valueSchema = schema.getValueSchema();
                    if (keySchema != null && keySchema.getFieldType() == FieldType.STRING && !keySchema.isOptional()) {
                        // Non-optional string keys
                        Map<CharSequence, Object> original = (Map<CharSequence, Object>) value;
                        Map<CharSequence, Object> result = new HashMap<>(original.size());
                        for (Map.Entry<CharSequence, Object> entry : original.entrySet()) {
                            result.put(entry.getKey().toString(),
                                    toConnectData(valueSchema, entry.getValue(), toConnectContext));
                        }
                        converted = result;
                    } else {
                        // Arbitrary keys
                        Collection<IndexedRecord> original = (Collection<IndexedRecord>) value;
                        Map<Object, Object> result = new HashMap<>(original.size());
                        for (IndexedRecord entry : original) {
                            int avroKeyFieldIndex = entry.getSchema().getField(KEY_FIELD).pos();
                            int avroValueFieldIndex = entry.getSchema().getField(VALUE_FIELD).pos();
                            Object convertedKey = toConnectData(
                                    keySchema, entry.get(avroKeyFieldIndex), toConnectContext);
                            Object convertedValue = toConnectData(
                                    valueSchema, entry.get(avroValueFieldIndex), toConnectContext);
                            result.put(convertedKey, convertedValue);
                        }
                        converted = result;
                    }
                    break;
                }

                case STRUCT: {
                    // Special case support for union types
                    if (schema.getName() != null && schema.getName().equals(AVRO_TYPE_UNION)) {
                        Schema valueRecordSchema = null;
                        if (value instanceof IndexedRecord) {
                            IndexedRecord valueRecord = (IndexedRecord) value;
                            valueRecordSchema = toConnectSchemaWithCycles(
                                    valueRecord.getSchema(), true, null, null, toConnectContext);
                        }
                        for (Field field : schema.getFields()) {
                            Schema fieldSchema = field.getSchema();
                            if (isInstanceOfAvroSchemaTypeForSimpleSchema(
                                    fieldSchema, value, enhancedSchemaSupport)
                                    || (valueRecordSchema != null && schemaEquals(valueRecordSchema, fieldSchema))) {
                                converted = new Struct(schema).put(
                                        unionMemberFieldName(fieldSchema, enhancedSchemaSupport),
                                        toConnectData(fieldSchema, value, toConnectContext));
                                break;
                            }
                        }
                        if (converted == null) {
                            throw new ConnectException(
                                    "Did not find matching union field for data: " + value);
                        }
                    } else if (value instanceof Map) {
                        // Default values from Avro are returned as Map
                        Map<CharSequence, Object> original = (Map<CharSequence, Object>) value;
                        Struct result = new Struct(schema);
                        for (Field field : schema.getFields()) {
                            Object convertedFieldValue =
                                    toConnectData(field.getSchema(), original.get(field.getName()), toConnectContext);
                            result.put(field, convertedFieldValue);
                        }
                        return result;
                    } else {
                        IndexedRecord original = (IndexedRecord) value;
                        Struct result = new Struct(schema);
                        for (Field field : schema.getFields()) {
                            int avroFieldIndex = original.getSchema().getField(field.getName()).pos();
                            Object convertedFieldValue =
                                    toConnectData(field.getSchema(), original.get(avroFieldIndex), toConnectContext);
                            result.put(field, convertedFieldValue);
                        }
                        converted = result;
                    }
                    break;
                }

                default:
                    throw new ConnectException("Unknown Connect schema type: " + schema.getFieldType());
            }

            if (schema.getName() != null && doLogicalConversion) {
                LogicalTypeConverter logicalConverter = TO_CONNECT_LOGICAL_CONVERTERS.get(schema.getName());
                if (logicalConverter != null) {
                    converted = logicalConverter.convert(schema, converted);
                }
            }
            return converted;
        } catch (ClassCastException e) {
            String schemaType = schema != null ? schema.getName() : "null";
            throw new ConnectException("Invalid type for " + schemaType + ": " + value.getClass());
        }
    }