in core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java [44:70]
public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
Objects.requireNonNull(value);
// gracefully try to infer the schema
final Schema knownSchema = Values.inferSchema(value);
if (knownSchema == null) {
// let's now check for other types
if (value instanceof Date) {
return org.apache.kafka.connect.data.Date.builder();
}
if (value instanceof BigDecimal) {
return Decimal.builder(((BigDecimal) value).scale());
}
// we re-check map and list since inferSchema function is not tolerant against map and list
// for now we rely on inferSchema, however it makes some point to build a specific inferSchema method only for this connector
if (value instanceof Map) {
return new SchemaBuilder(Schema.Type.MAP);
}
if (value instanceof List) {
return new SchemaBuilder(Schema.Type.ARRAY);
}
// if we do not fine any of schema out of the above, we just return an an optional byte schema
return SchemaBuilder.bytes().optional();
}
return SchemaUtil.copySchemaBasics(knownSchema);
}