in avro-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/avrodata/AvroData.java [199:249]
static {
TO_CONNECT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object convert(Schema schema, Object value) {
if (value instanceof byte[]) {
return Decimal.toLogical(schema, (byte[]) value);
} else if (value instanceof ByteBuffer) {
return Decimal.toLogical(schema, ((ByteBuffer) value).array());
}
throw new DataException(
"Invalid type for Decimal, underlying representation should be bytes but was "
+ value.getClass());
}
});
TO_CONNECT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object convert(Schema schema, Object value) {
if (!(value instanceof Integer)) {
throw new DataException(
"Invalid type for Date, underlying representation should be int32 but was "
+ value.getClass());
}
return Date.toLogical(schema, (int) value);
}
});
TO_CONNECT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object convert(Schema schema, Object value) {
if (!(value instanceof Integer)) {
throw new DataException(
"Invalid type for Time, underlying representation should be int32 but was "
+ value.getClass());
}
return Time.toLogical(schema, (int) value);
}
});
TO_CONNECT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object convert(Schema schema, Object value) {
if (!(value instanceof Long)) {
throw new DataException(
"Invalid type for Timestamp, underlying representation should be int64 but was "
+ value.getClass());
}
return Timestamp.toLogical(schema, (long) value);
}
});
}