in amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java [314:378]
private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
synchronized (lockObject) {
if (!hasValidSubscriber()) {
if(hasValidFlow()) {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
" Last successful request details -- {}", streamAndShardId, flow.connectionStartedAt,
flow.subscribeToShardId, lastSuccessfulRequestDetails);
} else {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
" Last successful request details -- {}", streamAndShardId, lastSuccessfulRequestDetails);
}
return;
}
Throwable propagationThrowable = t;
ThrowableCategory category = throwableCategory(propagationThrowable);
if (isActiveFlow(triggeringFlow)) {
if (flow != null) {
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful request details -- %s", streamAndShardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable);
propagationThrowable = new RetryableRetrievalException(category.throwableTypeString,
(Exception) propagationThrowable.getCause());
break;
case ACQUIRE_TIMEOUT:
logAcquireTimeoutMessage(t);
//
// Fall through is intentional here as we still want to log the details of the exception
//
default:
log.warn(logMessage, propagationThrowable);
}
flow.cancel();
}
log.debug("{}: availableQueueSpace zeroing from {}", streamAndShardId, availableQueueSpace);
availableQueueSpace = 0;
try {
handleFlowError(propagationThrowable, triggeringFlow);
} catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}",
streamAndShardId, lastSuccessfulRequestDetails, innerThrowable);
}
subscriber = null;
flow = null;
} else {
if (triggeringFlow != null) {
log.debug(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error",
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
category.throwableTypeString);
triggeringFlow.cancel();
}
}
}
}