in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java [271:382]
static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
if (kafkaSchema == null) {
if (jsonNode == null || jsonNode.isNull()) {
return null;
}
throw new DataException("Don't know how to convert " + jsonNode
+ " to Connect data (schema is null).");
}
if (jsonNode == null || jsonNode.isNull()) {
return defaultOrThrow(kafkaSchema);
}
// special case for a few classes defined in org.apache.kafka.connect.data
// and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema
// time/date as String not supported as the format to parse is not clear
// (add it as a config param?)
if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) {
if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) {
return Timestamp.toLogical(kafkaSchema, jsonNode.longValue());
} else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) {
return Date.toLogical(kafkaSchema, jsonNode.intValue());
} else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) {
return Time.toLogical(kafkaSchema, jsonNode.intValue());
} else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) {
if (jsonNode.isNumber()) {
return jsonNode.decimalValue();
}
try {
return Decimal.toLogical(kafkaSchema, jsonNode.binaryValue());
} catch (IOException e) {
throw new IllegalStateException("Could not convert Kafka Logical Schema " + kafkaSchema.name()
+ " for jsonNode " + jsonNode + " into Decimal");
}
}
throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name()
+ " for jsonNode " + jsonNode);
}
switch (kafkaSchema.type()) {
case INT8:
Preconditions.checkArgument(jsonNode.isNumber());
return (byte) jsonNode.shortValue();
case INT16:
Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.shortValue();
case INT32:
if (jsonNode.isTextual() && jsonNode.textValue().length() == 1) {
// char encoded as String instead of Integer
return Character.getNumericValue(jsonNode.textValue().charAt(0));
}
Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.intValue();
case INT64:
Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.longValue();
case FLOAT32:
Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.floatValue();
case FLOAT64:
Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.doubleValue();
case BOOLEAN:
Preconditions.checkArgument(jsonNode.isBoolean());
return jsonNode.booleanValue();
case STRING:
Preconditions.checkArgument(jsonNode.isTextual());
return jsonNode.textValue();
case BYTES:
Preconditions.checkArgument(jsonNode.isBinary());
try {
return jsonNode.binaryValue();
} catch (IOException e) {
throw new DataException("Cannot get binary value for " + jsonNode + " with schema " + kafkaSchema);
}
case ARRAY:
if (jsonNode.isTextual() && kafkaSchema.valueSchema().type() == Schema.Type.INT32) {
// char[] encoded as String in json
List<Object> list = new ArrayList<>();
for (char ch: jsonNode.textValue().toCharArray()) {
list.add(Character.getNumericValue(ch));
}
return list;
}
Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has to be an array");
List<Object> list = new ArrayList<>();
for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext();) {
list.add(jsonAsConnectData(it.next(), kafkaSchema.valueSchema()));
}
return list;
case MAP:
Preconditions.checkArgument(jsonNode.isObject(), "jsonNode has to be an Object node");
Preconditions.checkArgument(kafkaSchema.keySchema().type() == Schema.Type.STRING,
"kafka schema for json map is expected to be STRING");
Map<String, Object> map = new HashMap<>();
for (Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields(); it.hasNext(); ) {
Map.Entry<String, JsonNode> elem = it.next();
map.put(elem.getKey(),
jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
}
return map;
case STRUCT:
Struct struct = new Struct(kafkaSchema);
for (Field field: kafkaSchema.fields()) {
struct.put(field, jsonAsConnectData(jsonNode.get(field.name()), field.schema()));
}
return struct;
default:
throw new DataException("Unknown schema type " + kafkaSchema.type());
}
}