in flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java [85:103]
public void acknowledge(List<String> acknowledgementIds) {
if (acknowledgementIds.isEmpty()) {
return;
}
// grpc servers won't accept acknowledge requests that are too large so we split the ackIds
Tuple2<List<String>, List<String>> splittedAckIds = splitAckIds(acknowledgementIds);
while (!splittedAckIds.f0.isEmpty()) {
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(projectSubscriptionName)
.addAllAckIds(splittedAckIds.f0)
.build();
stub.withDeadlineAfter(60, SECONDS).acknowledge(acknowledgeRequest);
splittedAckIds = splitAckIds(splittedAckIds.f1);
}
}