in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [798:831]
public void shutdownFetcher() {
LOG.info(
"Starting shutdown of shard consumer threads and AWS SDK resources of subtask {} ...",
indexOfThisConsumerSubtask,
error.get());
running = false;
try {
try {
deregisterStreamConsumer();
} catch (Exception e) {
LOG.warn("Encountered exception deregistering stream consumers", e);
}
try {
closeRecordPublisherFactory();
} catch (Exception e) {
LOG.warn("Encountered exception closing record publisher factory", e);
}
} finally {
gracefulShutdownShardConsumers();
cancelFuture.complete(null);
if (watermarkTracker != null) {
watermarkTracker.close();
}
this.recordEmitter.stop();
}
LOG.info(
"Shutting down the shard consumer threads of subtask {} ...",
indexOfThisConsumerSubtask);
}