private void checkDeserializer()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java [543:576]


    private void checkDeserializer(String deserializer) {
        try {
            Class<?> deserClass = Class.forName(deserializer);
            if (!Deserializer.class.isAssignableFrom(deserClass)) {
                throw new IllegalArgumentException(
                        String.format(
                                "Deserializer class %s is not a subclass of %s",
                                deserializer, Deserializer.class.getName()));
            }

            // Get the generic type information
            Type[] interfaces = deserClass.getGenericInterfaces();
            for (Type iface : interfaces) {
                if (iface instanceof ParameterizedType) {
                    ParameterizedType parameterizedType = (ParameterizedType) iface;
                    Type rawType = parameterizedType.getRawType();

                    // Check if it's Deserializer<byte[]>
                    if (rawType == Deserializer.class) {
                        Type[] typeArguments = parameterizedType.getActualTypeArguments();
                        if (typeArguments.length != 1 || typeArguments[0] != byte[].class) {
                            throw new IllegalArgumentException(
                                    String.format(
                                            "Deserializer class %s does not deserialize byte[]",
                                            deserializer));
                        }
                    }
                }
            }
        } catch (ClassNotFoundException e) {
            throw new IllegalArgumentException(
                    String.format("Deserializer class %s not found", deserializer), e);
        }
    }