public static Object castToKafkaSchema()

in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java [135:237]


    public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
        // special case for a few classes defined in org.apache.kafka.connect.data
        // and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema
        if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) {
            if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) {
                if (nativeObject instanceof java.util.Date) {
                    return nativeObject;
                }
                return Timestamp.toLogical(kafkaSchema, ((Number) nativeObject).longValue());
            } else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) {
                if (nativeObject instanceof java.util.Date) {
                    return nativeObject;
                }
                return Date.toLogical(kafkaSchema, ((Number) nativeObject).intValue());
            } else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) {
                if (nativeObject instanceof java.util.Date) {
                    return nativeObject;
                }
                return Time.toLogical(kafkaSchema, ((Number) nativeObject).intValue());
            } else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) {
                if (nativeObject instanceof java.math.BigDecimal) {
                    return nativeObject;
                }
                return Decimal.toLogical(kafkaSchema, (byte[]) nativeObject);
            }
            throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name()
                    + " for value " + nativeObject);
        }

        if (nativeObject instanceof Number) {
            // This is needed in case
            // jackson decided to fit value into some other type internally
            // (e.g. Double instead of Float).
            // Kafka's ConnectSchema expects exact type
            // https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71
            Number num = (Number) nativeObject;
            switch (kafkaSchema.type()) {
                case INT8:
                    if (!(nativeObject instanceof Byte)) {
                        if (log.isDebugEnabled()) {
                            log.debug("nativeObject of type {} converted to Byte", nativeObject.getClass());
                        }
                        return num.byteValue();
                    }
                    break;
                case INT16:
                    if (!(nativeObject instanceof Short)) {
                        if (log.isDebugEnabled()) {
                            log.debug("nativeObject of type {} converted to Short", nativeObject.getClass());
                        }
                        return num.shortValue();
                    }
                    break;
                case INT32:
                    if (!(nativeObject instanceof Integer)) {
                        if (log.isDebugEnabled()) {
                            log.debug("nativeObject of type {} converted to Integer", nativeObject.getClass());
                        }
                        return num.intValue();
                    }
                    break;
                case INT64:
                    if (!(nativeObject instanceof Long)) {
                        if (log.isDebugEnabled()) {
                            log.debug("nativeObject of type {} converted to Long", nativeObject.getClass());
                        }
                        return num.longValue();
                    }
                    break;
                case FLOAT32:
                    if (!(nativeObject instanceof Float)) {
                        if (log.isDebugEnabled()) {
                            log.debug("nativeObject of type {} converted to Float", nativeObject.getClass());
                        }
                        return num.floatValue();
                    }
                    break;
                case FLOAT64:
                    if (!(nativeObject instanceof Double)) {
                        if (log.isDebugEnabled()) {
                            log.debug("nativeObject of type {} converted to Double", nativeObject.getClass());
                        }
                        return num.doubleValue();
                    }
                    break;
            }
        }

        if (nativeObject instanceof Character) {
            Character ch = (Character) nativeObject;
            if (kafkaSchema.type() == Schema.Type.STRING) {
                return ch.toString();
            }
            return castToKafkaSchema(Character.getNumericValue(ch), kafkaSchema);
        }

        if (kafkaSchema.type() == Schema.Type.STRING && nativeObject instanceof CharSequence) {
            // e.g. org.apache.avro.util.Utf8
            return nativeObject.toString();
        }

        return nativeObject;
    }