in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java [543:576]
private void checkDeserializer(String deserializer) {
try {
Class<?> deserClass = Class.forName(deserializer);
if (!Deserializer.class.isAssignableFrom(deserClass)) {
throw new IllegalArgumentException(
String.format(
"Deserializer class %s is not a subclass of %s",
deserializer, Deserializer.class.getName()));
}
// Get the generic type information
Type[] interfaces = deserClass.getGenericInterfaces();
for (Type iface : interfaces) {
if (iface instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) iface;
Type rawType = parameterizedType.getRawType();
// Check if it's Deserializer<byte[]>
if (rawType == Deserializer.class) {
Type[] typeArguments = parameterizedType.getActualTypeArguments();
if (typeArguments.length != 1 || typeArguments[0] != byte[].class) {
throw new IllegalArgumentException(
String.format(
"Deserializer class %s does not deserialize byte[]",
deserializer));
}
}
}
}
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Deserializer class %s not found", deserializer), e);
}
}