static Object jsonAsConnectData()

in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java [271:382]


    static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
        if (kafkaSchema == null) {
            if (jsonNode == null || jsonNode.isNull()) {
                return null;
            }
            throw new DataException("Don't know how to convert " + jsonNode
                + " to Connect data (schema is null).");
        }

        if (jsonNode == null || jsonNode.isNull()) {
            return defaultOrThrow(kafkaSchema);
        }

        // special case for a few classes defined in org.apache.kafka.connect.data
        // and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema
        // time/date as String not supported as the format to parse is not clear
        // (add it as a config param?)
        if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) {
            if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) {
                return Timestamp.toLogical(kafkaSchema, jsonNode.longValue());
            } else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) {
                return Date.toLogical(kafkaSchema, jsonNode.intValue());
            } else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) {
                return Time.toLogical(kafkaSchema, jsonNode.intValue());
            } else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) {
                if (jsonNode.isNumber()) {
                    return jsonNode.decimalValue();
                }
                try {
                    return Decimal.toLogical(kafkaSchema, jsonNode.binaryValue());
                } catch (IOException e) {
                    throw new IllegalStateException("Could not convert Kafka Logical Schema " + kafkaSchema.name()
                            + " for jsonNode " + jsonNode + " into Decimal");
                }
            }
            throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name()
                    + " for jsonNode " + jsonNode);
        }

        switch (kafkaSchema.type()) {
            case INT8:
                Preconditions.checkArgument(jsonNode.isNumber());
                return (byte) jsonNode.shortValue();
            case INT16:
                Preconditions.checkArgument(jsonNode.isNumber());
                return jsonNode.shortValue();
            case INT32:
                if (jsonNode.isTextual() && jsonNode.textValue().length() == 1) {
                    // char encoded as String instead of Integer
                    return Character.getNumericValue(jsonNode.textValue().charAt(0));
                }
                Preconditions.checkArgument(jsonNode.isNumber());
                return jsonNode.intValue();
            case INT64:
                Preconditions.checkArgument(jsonNode.isNumber());
                return  jsonNode.longValue();
            case FLOAT32:
                Preconditions.checkArgument(jsonNode.isNumber());
                return jsonNode.floatValue();
            case FLOAT64:
                Preconditions.checkArgument(jsonNode.isNumber());
                return jsonNode.doubleValue();
            case BOOLEAN:
                Preconditions.checkArgument(jsonNode.isBoolean());
                return jsonNode.booleanValue();
            case STRING:
                Preconditions.checkArgument(jsonNode.isTextual());
                return jsonNode.textValue();
            case BYTES:
                Preconditions.checkArgument(jsonNode.isBinary());
                try {
                    return jsonNode.binaryValue();
                } catch (IOException e) {
                    throw new DataException("Cannot get binary value for " + jsonNode + " with schema " + kafkaSchema);
                }
            case ARRAY:
                if (jsonNode.isTextual() && kafkaSchema.valueSchema().type() == Schema.Type.INT32) {
                    // char[] encoded as String in json
                    List<Object> list = new ArrayList<>();
                    for (char ch: jsonNode.textValue().toCharArray()) {
                        list.add(Character.getNumericValue(ch));
                    }
                    return list;
                }

                Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has to be an array");
                List<Object> list = new ArrayList<>();
                for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext();) {
                    list.add(jsonAsConnectData(it.next(), kafkaSchema.valueSchema()));
                }
                return list;
            case MAP:
                Preconditions.checkArgument(jsonNode.isObject(), "jsonNode has to be an Object node");
                Preconditions.checkArgument(kafkaSchema.keySchema().type() == Schema.Type.STRING,
                        "kafka schema for json map is expected to be STRING");
                Map<String, Object> map = new HashMap<>();
                for (Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields(); it.hasNext(); ) {
                    Map.Entry<String, JsonNode> elem = it.next();
                    map.put(elem.getKey(),
                            jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
                }
                return map;
            case STRUCT:
                Struct struct = new Struct(kafkaSchema);
                for (Field field: kafkaSchema.fields()) {
                    struct.put(field, jsonAsConnectData(jsonNode.get(field.name()), field.schema()));
                }
                return struct;
            default:
                throw new DataException("Unknown schema type " + kafkaSchema.type());
        }
    }