public void run()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/FlinkKinesisConsumer.java [284:355]


	public void run(SourceContext<T> sourceContext) throws Exception {

		// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
		// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
		// can potentially have new shards to subscribe to later on
		KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

		// initial discovery
		List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();

		for (StreamShardHandle shard : allShards) {
			StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
				new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));

			if (sequenceNumsToRestore != null) {

				if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
					// if the shard was already seen and is contained in the state,
					// just use the sequence number stored in the state
					fetcher.registerNewSubscribedShardState(
						new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));

					if (LOG.isInfoEnabled()) {
						LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
								" starting state set to the restored sequence number {}",
							getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
					}
				} else {
					// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
					fetcher.registerNewSubscribedShardState(
						new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));

					if (LOG.isInfoEnabled()) {
						LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
								" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
							getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
					}
				}
			} else {
				// we're starting fresh; use the configured start position as initial state
				SentinelSequenceNumber startingSeqNum =
					InitialPosition.valueOf(configProps.getProperty(
						ConsumerConfigConstants.STREAM_INITIAL_POSITION,
						ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();

				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
				}
			}
		}

		// check that we are running before starting the fetcher
		if (!running) {
			return;
		}

		// expose the fetcher from this point, so that state
		// snapshots can be taken from the fetcher's state holders
		this.fetcher = fetcher;

		// start the fetcher loop. The fetcher will stop running only when cancel() or
		// close() is called, or an error is thrown by threads created by the fetcher
		fetcher.runFetcher();

		// check that the fetcher has terminated before fully closing
		fetcher.awaitTermination();
		sourceContext.close();
	}