in flink-connector/flink-connector-gcp-pubsub/src/main/java/com/google/pubsub/flink/internal/source/reader/PubSubSplitReader.java [56:83]
private ApiFuture<Void> notifyDataAvailable() {
SettableApiFuture<Void> future = SettableApiFuture.create();
subscribers
.values()
.forEach(
s -> {
ApiFuture<Void> notification = s.notifyDataAvailable();
ApiFutures.addCallback(
notification,
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
// Ignore exceptions caused by wakeups
if (t instanceof PubSubNotifyingPullSubscriber.SubscriberWakeupException) {
future.set(null);
}
future.setException(t);
}
@Override
public void onSuccess(Void result) {
future.set(null);
}
},
MoreExecutors.directExecutor());
});
return future;
}