in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [448:464]
public void stop() {
log.info("Stopping CloudPubSubSinkTask");
if (publisher != null) {
log.info("Shutting down PubSub publisher");
try {
publisher.shutdown();
boolean terminated = publisher.awaitTermination(maxShutdownTimeoutMs, TimeUnit.MILLISECONDS);
if (!terminated) {
log.warn(String.format("PubSub publisher did not terminate cleanly in %d ms", maxShutdownTimeoutMs));
}
} catch (Exception e) {
// There is not much we can do here besides logging it as an error
log.error("An exception occurred while shutting down PubSub publisher", e);
}
}
}