private Object convertKafkaValue()

in connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java [49:95]


    private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
        if (targetSchema == null) {
            if (originalValue == null) {
                return null;
            }
            return  originalValue;
        }
        switch (targetSchema.type()) {
            case INT8:
            case INT16:
            case INT32:
            case INT64:
            case FLOAT32:
            case FLOAT64:
            case BOOLEAN:
            case STRING:
            case BYTES:
                return originalValue;
            case STRUCT:
                Struct toStruct = new Struct(targetSchema);
                if (originalValue != null) {
                    convertStructValue(toStruct, (org.apache.kafka.connect.data.Struct) originalValue);
                }
                return toStruct;
            case ARRAY:
                List<Object> array = (List<Object>) originalValue;
                List<Object> newArray = new ArrayList<>();
                array.forEach(item -> {
                    newArray.add(convertKafkaValue(targetSchema.valueSchema(), item));
                });
                return newArray;
            case MAP:
                Map mapData = (Map) originalValue;
                Map newMapData = new ConcurrentHashMap();
                mapData.forEach((k, v) -> {
                    newMapData.put(
                            convertKafkaValue(targetSchema.keySchema(), k),
                            convertKafkaValue(targetSchema.valueSchema(), v)
                    );
                });
                return newMapData;
            default:
                throw new RuntimeException(" Type not supported: {}" + targetSchema.type());

        }

    }