in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java [135:237]
public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
// special case for a few classes defined in org.apache.kafka.connect.data
// and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema
if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) {
if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) {
if (nativeObject instanceof java.util.Date) {
return nativeObject;
}
return Timestamp.toLogical(kafkaSchema, ((Number) nativeObject).longValue());
} else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) {
if (nativeObject instanceof java.util.Date) {
return nativeObject;
}
return Date.toLogical(kafkaSchema, ((Number) nativeObject).intValue());
} else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) {
if (nativeObject instanceof java.util.Date) {
return nativeObject;
}
return Time.toLogical(kafkaSchema, ((Number) nativeObject).intValue());
} else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) {
if (nativeObject instanceof java.math.BigDecimal) {
return nativeObject;
}
return Decimal.toLogical(kafkaSchema, (byte[]) nativeObject);
}
throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name()
+ " for value " + nativeObject);
}
if (nativeObject instanceof Number) {
// This is needed in case
// jackson decided to fit value into some other type internally
// (e.g. Double instead of Float).
// Kafka's ConnectSchema expects exact type
// https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71
Number num = (Number) nativeObject;
switch (kafkaSchema.type()) {
case INT8:
if (!(nativeObject instanceof Byte)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Byte", nativeObject.getClass());
}
return num.byteValue();
}
break;
case INT16:
if (!(nativeObject instanceof Short)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Short", nativeObject.getClass());
}
return num.shortValue();
}
break;
case INT32:
if (!(nativeObject instanceof Integer)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Integer", nativeObject.getClass());
}
return num.intValue();
}
break;
case INT64:
if (!(nativeObject instanceof Long)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Long", nativeObject.getClass());
}
return num.longValue();
}
break;
case FLOAT32:
if (!(nativeObject instanceof Float)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Float", nativeObject.getClass());
}
return num.floatValue();
}
break;
case FLOAT64:
if (!(nativeObject instanceof Double)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Double", nativeObject.getClass());
}
return num.doubleValue();
}
break;
}
}
if (nativeObject instanceof Character) {
Character ch = (Character) nativeObject;
if (kafkaSchema.type() == Schema.Type.STRING) {
return ch.toString();
}
return castToKafkaSchema(Character.getNumericValue(ch), kafkaSchema);
}
if (kafkaSchema.type() == Schema.Type.STRING && nativeObject instanceof CharSequence) {
// e.g. org.apache.avro.util.Utf8
return nativeObject.toString();
}
return nativeObject;
}