in connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java [397:471]
void checkSchemaMatches(SchemaAndValue schemaAndValue) {
if (schemaAndValue != null) {
Schema schema = schemaAndValue.schema();
if (schema == null)
return;
schema = schema.schema(); // in case a SchemaBuilder is used
Object value = schemaAndValue.value();
if (value == null && !schema.isOptional()) {
throw new DataException("A null value requires an optional schema but was " + schema);
}
if (value != null) {
switch (schema.type()) {
case BYTES:
if (value instanceof ByteBuffer)
return;
if (value instanceof byte[])
return;
if (value instanceof BigDecimal && Decimal.LOGICAL_NAME.equals(schema.name()))
return;
break;
case STRING:
if (value instanceof String)
return;
break;
case BOOLEAN:
if (value instanceof Boolean)
return;
break;
case INT8:
if (value instanceof Byte)
return;
break;
case INT16:
if (value instanceof Short)
return;
break;
case INT32:
if (value instanceof Integer)
return;
if (value instanceof java.util.Date && Date.LOGICAL_NAME.equals(schema.name()))
return;
if (value instanceof java.util.Date && Time.LOGICAL_NAME.equals(schema.name()))
return;
break;
case INT64:
if (value instanceof Long)
return;
if (value instanceof java.util.Date && Timestamp.LOGICAL_NAME.equals(schema.name()))
return;
break;
case FLOAT32:
if (value instanceof Float)
return;
break;
case FLOAT64:
if (value instanceof Double)
return;
break;
case ARRAY:
if (value instanceof List)
return;
break;
case MAP:
if (value instanceof Map)
return;
break;
case STRUCT:
if (value instanceof Struct)
return;
break;
}
throw new DataException("The value " + value + " is not compatible with the schema " + schema);
}
}
}