private void handleError()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java [285:317]


    private void handleError(final Throwable throwable) throws FanOutSubscriberException {
        Throwable cause;
        if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
            cause = throwable.getCause();
        } else {
            cause = throwable;
        }

        LOG.warn(
                "Error occurred on EFO subscription: {} - ({}).  {} ({})",
                throwable.getClass().getName(),
                throwable.getMessage(),
                shardId,
                consumerArn,
                cause);

        if (isInterrupted(throwable)) {
            throw new FanOutSubscriberInterruptedException(throwable);
        } else if (cause instanceof FanOutSubscriberException) {
            throw (FanOutSubscriberException) cause;
        } else if (cause instanceof ReadTimeoutException) {
            // ReadTimeoutException occurs naturally under backpressure scenarios when full batches
            // take longer to
            // process than standard read timeout (default 30s). Recoverable exceptions are intended
            // to be retried
            // indefinitely to avoid system degradation under backpressure. The EFO connection
            // (subscription) to Kinesis
            // is closed, and reacquired once the queue of records has been processed.
            throw new RecoverableFanOutSubscriberException(cause);
        } else {
            throw new RetryableFanOutSubscriberException(cause);
        }
    }