private void flush()

in kafka-connector/src/main/java/com/google/pubsub/kafka/source/AckBatchingSubscriber.java [54:79]


  private void flush() {
    List<String> ackIds = new ArrayList<>();
    List<SettableApiFuture<Empty>> futures = new ArrayList<>();
    synchronized (this) {
      if (toSend.isEmpty()) {
        return;
      }
      toSend.forEach(pair -> {
        ackIds.addAll(pair.ids);
        futures.add(pair.future);
      });
      toSend.clear();
    }
    ApiFuture<Empty> response = underlying.ackMessages(ackIds);
    ApiFutures.addCallback(response, new ApiFutureCallback<Empty>() {
      @Override
      public void onFailure(Throwable t) {
        futures.forEach(future -> future.setException(t));
      }

      @Override
      public void onSuccess(Empty result) {
        futures.forEach(future -> future.set(result));
      }
    }, MoreExecutors.directExecutor());
  }