public void runFetcher()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [504:773]


    public void runFetcher() throws Exception {

        // check that we are running before proceeding
        if (!running) {
            return;
        }

        // ------------------------------------------------------------------------
        //  Procedures before starting the infinite while loop:
        // ------------------------------------------------------------------------

        //  1. check that there is at least one shard in the subscribed streams to consume from (can
        // be done by
        //     checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not
        // null)
        boolean hasShards = false;
        StringBuilder streamsWithNoShardsFound = new StringBuilder();
        for (Map.Entry<String, String> streamToLastDiscoveredShardEntry :
                subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
            if (streamToLastDiscoveredShardEntry.getValue() != null) {
                hasShards = true;
            } else {
                streamsWithNoShardsFound
                        .append(streamToLastDiscoveredShardEntry.getKey())
                        .append(", ");
            }
        }

        if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
            LOG.warn(
                    "Subtask {} has failed to find any shards for the following subscribed streams: {}",
                    indexOfThisConsumerSubtask,
                    streamsWithNoShardsFound.toString());
        }

        if (!hasShards) {
            throw new RuntimeException(
                    "No shards can be found for all subscribed streams: " + streams);
        }

        //  2. start consuming any shard state we already have in the subscribedShardState up to
        // this point; the
        //     subscribedShardState may already be seeded with values due to step 1., or explicitly
        // added by the
        //     consumer using a restored state checkpoint
        for (int seededStateIndex = 0;
                seededStateIndex < subscribedShardsState.size();
                seededStateIndex++) {
            KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);

            // only start a consuming thread if the seeded subscribed shard has not been completely
            // read already
            if (!seededShardState
                    .getLastProcessedSequenceNum()
                    .equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {

                if (LOG.isInfoEnabled()) {
                    LOG.info(
                            "Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
                            indexOfThisConsumerSubtask,
                            seededShardState.getStreamShardHandle().toString(),
                            seededShardState.getLastProcessedSequenceNum(),
                            seededStateIndex);
                }

                StreamShardHandle streamShardHandle =
                        subscribedShardsState.get(seededStateIndex).getStreamShardHandle();
                KinesisDeserializationSchema<T> shardDeserializationSchema =
                        getClonedDeserializationSchema();
                shardDeserializationSchema.open(
                        RuntimeContextInitializationContextAdapters.deserializationAdapter(
                                runtimeContext,
                                // ignore the provided metric group
                                metricGroup ->
                                        consumerMetricGroup
                                                .addGroup(
                                                        "subtaskId",
                                                        String.valueOf(indexOfThisConsumerSubtask))
                                                .addGroup(
                                                        "shardId",
                                                        streamShardHandle.getShard().getShardId())
                                                .addGroup("user")));
                shardConsumersExecutor.submit(
                        createShardConsumer(
                                seededStateIndex,
                                streamShardHandle,
                                subscribedShardsState
                                        .get(seededStateIndex)
                                        .getLastProcessedSequenceNum(),
                                registerShardMetricGroup(
                                        consumerMetricGroup,
                                        subscribedShardsState.get(seededStateIndex)),
                                shardDeserializationSchema));
            }
        }

        // start periodic watermark emitter, if a watermark assigner was configured
        if (periodicWatermarkAssigner != null) {
            long periodicWatermarkIntervalMillis =
                    runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
            if (periodicWatermarkIntervalMillis > 0) {
                ProcessingTimeService timerService =
                        ((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
                LOG.info(
                        "Starting periodic watermark emitter with interval {}",
                        periodicWatermarkIntervalMillis);
                new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start();
                if (watermarkTracker != null) {
                    // setup global watermark tracking
                    long watermarkSyncMillis =
                            Long.parseLong(
                                    getConsumerConfiguration()
                                            .getProperty(
                                                    ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
                                                    Long.toString(
                                                            ConsumerConfigConstants
                                                                    .DEFAULT_WATERMARK_SYNC_MILLIS)));
                    watermarkTracker.setUpdateTimeoutMillis(
                            watermarkSyncMillis * 3); // synchronization latency
                    watermarkTracker.open(runtimeContext);
                    new WatermarkSyncCallback(timerService, watermarkSyncMillis).start();
                    // emit records ahead of watermark to offset synchronization latency
                    long lookaheadMillis =
                            Long.parseLong(
                                    getConsumerConfiguration()
                                            .getProperty(
                                                    ConsumerConfigConstants
                                                            .WATERMARK_LOOKAHEAD_MILLIS,
                                                    Long.toString(0)));
                    recordEmitter.setMaxLookaheadMillis(
                            Math.max(lookaheadMillis, watermarkSyncMillis * 3));

                    // record emitter depends on periodic watermark
                    // it runs in a separate thread since main thread is used for discovery
                    Runnable recordEmitterRunnable =
                            new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        recordEmitter.run();
                                    } catch (Throwable error) {
                                        // report the error that terminated the emitter loop to
                                        // source thread
                                        stopWithError(error);
                                    }
                                }
                            };

                    Thread thread = new Thread(recordEmitterRunnable);
                    thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks());
                    thread.setDaemon(true);
                    thread.start();
                }
            }
            this.shardIdleIntervalMillis =
                    Long.parseLong(
                            getConsumerConfiguration()
                                    .getProperty(
                                            ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
                                            Long.toString(
                                                    ConsumerConfigConstants
                                                            .DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
        }

        // ------------------------------------------------------------------------

        // finally, start the infinite shard discovery and consumer launching loop;
        // we will escape from this loop only when shutdownFetcher() or stopWithError() is called
        // TODO: have this thread emit the records for tracking backpressure

        final long discoveryIntervalMillis =
                Long.parseLong(
                        configProps.getProperty(
                                ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
                                Long.toString(
                                        ConsumerConfigConstants
                                                .DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));

        if (this.numberOfActiveShards.get() == 0) {
            LOG.info(
                    "Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...",
                    indexOfThisConsumerSubtask);
            sourceContext.markAsTemporarilyIdle();
        }

        while (running) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(
                        "Subtask {} is trying to discover new shards that were created due to resharding ...",
                        indexOfThisConsumerSubtask);
            }
            List<StreamShardHandle> newShardsDueToResharding = discoverNewShardsToSubscribe();

            for (StreamShardHandle shard : newShardsDueToResharding) {
                // since there may be delay in discovering a new shard, all new shards due to
                // resharding should be read starting from the earliest record possible
                KinesisStreamShardState newShardState =
                        new KinesisStreamShardState(
                                convertToStreamShardMetadata(shard),
                                shard,
                                SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
                int newStateIndex = registerNewSubscribedShardState(newShardState);

                if (LOG.isInfoEnabled()) {
                    LOG.info(
                            "Subtask {} has discovered a new shard {} due to resharding, and will start consuming "
                                    + "the shard from sequence number {} with ShardConsumer {}",
                            indexOfThisConsumerSubtask,
                            newShardState.getStreamShardHandle().toString(),
                            newShardState.getLastProcessedSequenceNum(),
                            newStateIndex);
                }

                StreamShardHandle streamShardHandle = newShardState.getStreamShardHandle();
                KinesisDeserializationSchema<T> shardDeserializationSchema =
                        getClonedDeserializationSchema();
                shardDeserializationSchema.open(
                        RuntimeContextInitializationContextAdapters.deserializationAdapter(
                                runtimeContext,
                                // ignore the provided metric group
                                metricGroup ->
                                        consumerMetricGroup
                                                .addGroup(
                                                        "subtaskId",
                                                        String.valueOf(indexOfThisConsumerSubtask))
                                                .addGroup(
                                                        "shardId",
                                                        streamShardHandle.getShard().getShardId())
                                                .addGroup("user")));
                shardConsumersExecutor.submit(
                        createShardConsumer(
                                newStateIndex,
                                newShardState.getStreamShardHandle(),
                                newShardState.getLastProcessedSequenceNum(),
                                registerShardMetricGroup(consumerMetricGroup, newShardState),
                                shardDeserializationSchema));
            }

            // we also check if we are running here so that we won't start the discovery sleep
            // interval if the running flag was set to false during the middle of the while loop
            if (running && discoveryIntervalMillis != 0) {
                try {
                    cancelFuture.get(discoveryIntervalMillis, TimeUnit.MILLISECONDS);
                    LOG.debug("Cancelled discovery");
                } catch (TimeoutException iex) {
                    // timeout is expected when fetcher is not cancelled
                }
            }
        }

        // make sure all resources have been terminated before leaving
        try {
            awaitTermination();
        } catch (InterruptedException ie) {
            // If there is an original exception, preserve it, since that's more important/useful.
            this.error.compareAndSet(null, ie);
        }

        // any error thrown in the shard consumer threads will be thrown to the main thread
        Throwable throwable = this.error.get();
        if (throwable != null) {
            if (throwable instanceof Exception) {
                throw (Exception) throwable;
            } else if (throwable instanceof Error) {
                throw (Error) throwable;
            } else {
                throw new Exception(throwable);
            }
        }
    }