in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [348:373]
public void flush(Map<TopicPartition, OffsetAndMetadata> partitionOffsets) {
log.debug("Flushing...");
// Process results of all the outstanding futures specified by each TopicPartition.
for (Map.Entry<TopicPartition, OffsetAndMetadata> partitionOffset :
partitionOffsets.entrySet()) {
log.trace("Received flush for partition " + partitionOffset.getKey().toString());
Map<Integer, OutstandingFuturesForPartition> outstandingFuturesForTopic =
allOutstandingFutures.get(partitionOffset.getKey().topic());
if (outstandingFuturesForTopic == null) {
continue;
}
OutstandingFuturesForPartition outstandingFutures =
outstandingFuturesForTopic.get(partitionOffset.getKey().partition());
if (outstandingFutures == null) {
continue;
}
try {
ApiFutures.allAsList(outstandingFutures.futures).get();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
outstandingFutures.futures.clear();
}
}
allOutstandingFutures.clear();
}