in kafka-connector/src/main/java/com/google/pubsub/kafka/source/StreamingPullSubscriber.java [124:135]
private synchronized ApiFuture<Void> onData() {
if (error.isPresent()) {
return ApiFutures.immediateFailedFuture(error.get());
}
if (!messages.isEmpty()) {
return ApiFutures.immediateFuture(null);
}
if (!notification.isPresent()) {
notification = Optional.of(SettableApiFuture.create());
}
return notification.get();
}