private void errorOccurred()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java [314:378]


    private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
        synchronized (lockObject) {

            if (!hasValidSubscriber()) {
                if(hasValidFlow()) {
                    log.warn(
                            "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
                                    " Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt,
                            flow.subscribeToShardId, lastSuccessfulRequestDetails);
                } else {
                    log.warn(
                            "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
                                    " Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails);
                }
                return;
            }

            Throwable propagationThrowable = t;
            ThrowableCategory category = throwableCategory(propagationThrowable);

            if (isActiveFlow(triggeringFlow)) {
                if (flow != null) {
                    String logMessage = String.format(
                            "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
                                    " Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
                    switch (category.throwableType) {
                    case READ_TIMEOUT:
                        log.debug(logMessage, propagationThrowable);
                        propagationThrowable = new RetryableRetrievalException(category.throwableTypeString,
                                (Exception) propagationThrowable.getCause());
                        break;
                    case ACQUIRE_TIMEOUT:
                        logAcquireTimeoutMessage(t);
                        //
                        // Fall through is intentional here as we still want to log the details of the exception
                        //
                    default:
                        log.warn(logMessage, propagationThrowable);

                    }
                    flow.cancel();
                }
                log.debug("{}: availableQueueSpace zeroing from {}", streamAndShardId, availableQueueSpace);
                availableQueueSpace = 0;

                try {
                    handleFlowError(propagationThrowable, triggeringFlow);
                } catch (Throwable innerThrowable) {
                    log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}",
                            streamAndShardId, lastSuccessfulRequestDetails, innerThrowable);
                }
                subscriber = null;
                flow = null;
            } else {
                if (triggeringFlow != null) {
                    log.debug(
                            "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow.  Didn't dispatch error",
                            streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
                            category.throwableTypeString);
                    triggeringFlow.cancel();
                }
            }

        }
    }