in flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java [140:160]
private void processMessage(
SourceContext<OUT> sourceContext,
List<ReceivedMessage> messages,
PubSubCollector collector)
throws Exception {
rateLimiter.acquire(messages.size());
synchronized (sourceContext.getCheckpointLock()) {
for (ReceivedMessage message : messages) {
acknowledgeOnCheckpoint.addAcknowledgeId(message.getAckId());
PubsubMessage pubsubMessage = message.getMessage();
deserializationSchema.deserialize(pubsubMessage, collector);
if (collector.isEndOfStreamSignalled()) {
cancel();
return;
}
}
}
}