in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java [40:67]
public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
Throwable throwable, Class<T> searchType, ClassLoader classLoader) {
if (throwable == null || searchType == null) {
return Optional.empty();
}
Throwable t = throwable;
while (t != null) {
if (searchType.isAssignableFrom(t.getClass())) {
return Optional.of(searchType.cast(t));
} else if (t instanceof SerializedThrowable) {
var deserialized = ((SerializedThrowable) t).deserializeError(classLoader);
// This is the key part of the fix:
// The deserializeError method returns the same SerializedThrowable if it cannot
// deserialize it. Previously this is what caused the infinite loop.
t =
deserialized == t
? new DeserializationException(
"Could not deserialize SerializedThrowable")
: deserialized;
} else {
t = t.getCause();
}
}
return Optional.empty();
}