in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java [285:317]
private void handleError(final Throwable throwable) throws FanOutSubscriberException {
Throwable cause;
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
cause = throwable.getCause();
} else {
cause = throwable;
}
LOG.warn(
"Error occurred on EFO subscription: {} - ({}). {} ({})",
throwable.getClass().getName(),
throwable.getMessage(),
shardId,
consumerArn,
cause);
if (isInterrupted(throwable)) {
throw new FanOutSubscriberInterruptedException(throwable);
} else if (cause instanceof FanOutSubscriberException) {
throw (FanOutSubscriberException) cause;
} else if (cause instanceof ReadTimeoutException) {
// ReadTimeoutException occurs naturally under backpressure scenarios when full batches
// take longer to
// process than standard read timeout (default 30s). Recoverable exceptions are intended
// to be retried
// indefinitely to avoid system degradation under backpressure. The EFO connection
// (subscription) to Kinesis
// is closed, and reacquired once the queue of records has been processed.
throw new RecoverableFanOutSubscriberException(cause);
} else {
throw new RetryableFanOutSubscriberException(cause);
}
}