in amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java [162:205]
public void executeLifecycle() {
if (isShutdown()) {
return;
}
if (stateChangeFuture != null && !stateChangeFuture.isDone()) {
return;
}
try {
if (isShutdownRequested()) {
stateChangeFuture = shutdownComplete();
} else if (needsInitialization) {
if (stateChangeFuture != null) {
if (stateChangeFuture.get()) {
// Task rejection during the subscribe() call will not be propagated back as it not executed
// in the context of the Scheduler thread. Hence we should not assume the subscription will
// always be successful.
subscribe();
needsInitialization = false;
}
}
stateChangeFuture = initializeComplete();
}
} catch (InterruptedException e) {
//
// Ignored should be handled by scheduler
//
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (RejectedExecutionException e) {
// It is possible the tasks submitted to the executor service by the Scheduler thread might get rejected
// due to various reasons. Such failed executions must be captured and marked as failure to prevent
// the state transitions.
taskOutcome = TaskOutcome.FAILURE;
}
if (ConsumerStates.ShardConsumerState.PROCESSING.equals(currentState.state())) {
Throwable t = healthCheck();
if (t instanceof Error) {
throw (Error) t;
}
}
}