in flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java [93:116]
public void open(Configuration configuration) throws Exception {
super.open(configuration);
if (hasNoCheckpointingEnabled(getRuntimeContext())) {
throw new IllegalArgumentException(
"The PubSubSource REQUIRES Checkpointing to be enabled and "
+ "the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message.");
}
getRuntimeContext()
.getMetricGroup()
.gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck);
// convert per-subtask-limit to global rate limit, as FlinkConnectorRateLimiter::setRate
// expects a global rate limit.
rateLimiter.setRate(
messagePerSecondRateLimit * getRuntimeContext().getNumberOfParallelSubtasks());
rateLimiter.open(getRuntimeContext());
deserializationSchema.open(
RuntimeContextInitializationContextAdapters.deserializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
createAndSetPubSubSubscriber();
this.isRunning = true;
}