in connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java [51:97]
private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
if (targetSchema == null) {
if (originalValue == null) {
return null;
}
return originalValue;
}
switch (targetSchema.getFieldType()) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
case BOOLEAN:
case STRING:
case BYTES:
return originalValue;
case STRUCT:
Struct toStruct = new Struct(targetSchema);
if (originalValue != null) {
convertStructValue(toStruct, (org.apache.kafka.connect.data.Struct) originalValue);
}
return toStruct;
case ARRAY:
List<Object> array = (List<Object>) originalValue;
List<Object> newArray = new ArrayList<>();
array.forEach(item -> {
newArray.add(convertKafkaValue(targetSchema.getValueSchema(), item));
});
return newArray;
case MAP:
Map mapData = (Map) originalValue;
Map newMapData = new ConcurrentHashMap();
mapData.forEach((k, v) -> {
newMapData.put(
convertKafkaValue(targetSchema.getKeySchema(), k),
convertKafkaValue(targetSchema.getValueSchema(), v)
);
});
return newMapData;
default:
throw new RuntimeException(" Type not supported: {}" + targetSchema.getFieldType());
}
}