void checkSchemaMatches()

in connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java [397:471]


    void checkSchemaMatches(SchemaAndValue schemaAndValue) {
        if (schemaAndValue != null) {
            Schema schema = schemaAndValue.schema();
            if (schema == null)
                return;
            schema = schema.schema(); // in case a SchemaBuilder is used
            Object value = schemaAndValue.value();
            if (value == null && !schema.isOptional()) {
                throw new DataException("A null value requires an optional schema but was " + schema);
            }
            if (value != null) {
                switch (schema.type()) {
                    case BYTES:
                        if (value instanceof ByteBuffer)
                            return;
                        if (value instanceof byte[])
                            return;
                        if (value instanceof BigDecimal && Decimal.LOGICAL_NAME.equals(schema.name()))
                            return;
                        break;
                    case STRING:
                        if (value instanceof String)
                            return;
                        break;
                    case BOOLEAN:
                        if (value instanceof Boolean)
                            return;
                        break;
                    case INT8:
                        if (value instanceof Byte)
                            return;
                        break;
                    case INT16:
                        if (value instanceof Short)
                            return;
                        break;
                    case INT32:
                        if (value instanceof Integer)
                            return;
                        if (value instanceof java.util.Date && Date.LOGICAL_NAME.equals(schema.name()))
                            return;
                        if (value instanceof java.util.Date && Time.LOGICAL_NAME.equals(schema.name()))
                            return;
                        break;
                    case INT64:
                        if (value instanceof Long)
                            return;
                        if (value instanceof java.util.Date && Timestamp.LOGICAL_NAME.equals(schema.name()))
                            return;
                        break;
                    case FLOAT32:
                        if (value instanceof Float)
                            return;
                        break;
                    case FLOAT64:
                        if (value instanceof Double)
                            return;
                        break;
                    case ARRAY:
                        if (value instanceof List)
                            return;
                        break;
                    case MAP:
                        if (value instanceof Map)
                            return;
                        break;
                    case STRUCT:
                        if (value instanceof Struct)
                            return;
                        break;
                }
                throw new DataException("The value " + value + " is not compatible with the schema " + schema);
            }
        }
    }