public void open()

in flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java [93:116]


    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        if (hasNoCheckpointingEnabled(getRuntimeContext())) {
            throw new IllegalArgumentException(
                    "The PubSubSource REQUIRES Checkpointing to be enabled and "
                            + "the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message.");
        }

        getRuntimeContext()
                .getMetricGroup()
                .gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck);

        // convert per-subtask-limit to global rate limit, as FlinkConnectorRateLimiter::setRate
        // expects a global rate limit.
        rateLimiter.setRate(
                messagePerSecondRateLimit * getRuntimeContext().getNumberOfParallelSubtasks());
        rateLimiter.open(getRuntimeContext());
        deserializationSchema.open(
                RuntimeContextInitializationContextAdapters.deserializationAdapter(
                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));

        createAndSetPubSubSubscriber();
        this.isRunning = true;
    }