public void run()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java [302:399]


    public void run(SourceContext<T> sourceContext) throws Exception {

        // all subtasks will run a fetcher, regardless of whether or not the subtask will initially
        // have
        // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so
        // all subtasks
        // can potentially have new shards to subscribe to later on
        KinesisDataFetcher<T> fetcher =
                createFetcher(
                        streams, sourceContext, getRuntimeContext(), configProps, deserializer);

        // initial discovery
        List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();

        for (StreamShardHandle shard : allShards) {
            StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
                    new StreamShardMetadata.EquivalenceWrapper(
                            KinesisDataFetcher.convertToStreamShardMetadata(shard));

            if (sequenceNumsToRestore != null) {

                if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
                    // if the shard was already seen and is contained in the state,
                    // just use the sequence number stored in the state
                    fetcher.registerNewSubscribedShardState(
                            new KinesisStreamShardState(
                                    kinesisStreamShard.getShardMetadata(),
                                    shard,
                                    sequenceNumsToRestore.get(kinesisStreamShard)));

                    if (LOG.isInfoEnabled()) {
                        LOG.info(
                                "Subtask {} is seeding the fetcher with restored shard {},"
                                        + " starting state set to the restored sequence number {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                shard.toString(),
                                sequenceNumsToRestore.get(kinesisStreamShard));
                    }
                } else {
                    // the shard wasn't discovered in the previous run, therefore should be consumed
                    // from the beginning
                    fetcher.registerNewSubscribedShardState(
                            new KinesisStreamShardState(
                                    kinesisStreamShard.getShardMetadata(),
                                    shard,
                                    SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));

                    if (LOG.isInfoEnabled()) {
                        LOG.info(
                                "Subtask {} is seeding the fetcher with new discovered shard {},"
                                        + " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                shard.toString());
                    }
                }
            } else {
                // we're starting fresh; use the configured start position as initial state
                SentinelSequenceNumber startingSeqNum =
                        InitialPosition.valueOf(
                                        configProps.getProperty(
                                                ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                                                ConsumerConfigConstants
                                                        .DEFAULT_STREAM_INITIAL_POSITION))
                                .toSentinelSequenceNumber();

                fetcher.registerNewSubscribedShardState(
                        new KinesisStreamShardState(
                                kinesisStreamShard.getShardMetadata(),
                                shard,
                                startingSeqNum.get()));

                if (LOG.isInfoEnabled()) {
                    LOG.info(
                            "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            shard.toString(),
                            startingSeqNum.get());
                }
            }
        }

        // check that we are running before starting the fetcher
        if (!running) {
            return;
        }

        // expose the fetcher from this point, so that state
        // snapshots can be taken from the fetcher's state holders
        this.fetcher = fetcher;

        // start the fetcher loop. The fetcher will stop running only when cancel() or
        // close() is called, or an error is thrown by threads created by the fetcher
        fetcher.runFetcher();

        // check that the fetcher has terminated before fully closing
        fetcher.awaitTermination();
        sourceContext.close();
    }