in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java [74:102]
void readToEnd(CompletableFuture<Void> future) {
synchronized (this) {
if (outstandingReadToEnd != null) {
outstandingReadToEnd.whenComplete((result, cause) -> {
if (null != cause) {
future.completeExceptionally(cause);
} else {
future.complete(result);
}
});
// return if the outstanding read has been issued
return;
} else {
outstandingReadToEnd = future;
future.whenComplete((result, cause) -> {
synchronized (PulsarOffsetBackingStore.this) {
outstandingReadToEnd = null;
}
});
}
}
producer.flushAsync().whenComplete((ignored, cause) -> {
if (null != cause) {
future.completeExceptionally(cause);
} else {
checkAndReadNext(future);
}
});
}