in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java [106:132]
public void produce(final ConsumerRecords<byte[], byte[]> element)
throws InterruptedException, WakeupException, ClosedException {
checkNotNull(element);
synchronized (lock) {
while (next != null && !wakeupProducer) {
lock.wait();
}
wakeupProducer = false;
// if there is still an element, we must have been woken up
if (next != null) {
throw new WakeupException();
}
// if there is no error, then this is open and can accept this element
else if (error == null) {
next = element;
lock.notifyAll();
}
// an error marks this as closed for the producer
else {
throw new ClosedException();
}
}
}