void readToEnd()

in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java [74:102]


    void readToEnd(CompletableFuture<Void> future) {
        synchronized (this) {
            if (outstandingReadToEnd != null) {
                outstandingReadToEnd.whenComplete((result, cause) -> {
                    if (null != cause) {
                        future.completeExceptionally(cause);
                    } else {
                        future.complete(result);
                    }
                });
                // return if the outstanding read has been issued
                return;
            } else {
                outstandingReadToEnd = future;
                future.whenComplete((result, cause) -> {
                    synchronized (PulsarOffsetBackingStore.this) {
                        outstandingReadToEnd = null;
                    }
                });
            }
        }
        producer.flushAsync().whenComplete((ignored, cause) -> {
            if (null != cause) {
                future.completeExceptionally(cause);
            } else {
                checkAndReadNext(future);
            }
        });
    }