in kafka-connector/src/main/java/com/google/pubsub/kafka/source/AckBatchingSubscriber.java [54:79]
private void flush() {
List<String> ackIds = new ArrayList<>();
List<SettableApiFuture<Empty>> futures = new ArrayList<>();
synchronized (this) {
if (toSend.isEmpty()) {
return;
}
toSend.forEach(pair -> {
ackIds.addAll(pair.ids);
futures.add(pair.future);
});
toSend.clear();
}
ApiFuture<Empty> response = underlying.ackMessages(ackIds);
ApiFutures.addCallback(response, new ApiFutureCallback<Empty>() {
@Override
public void onFailure(Throwable t) {
futures.forEach(future -> future.setException(t));
}
@Override
public void onSuccess(Empty result) {
futures.forEach(future -> future.set(result));
}
}, MoreExecutors.directExecutor());
}