public void produce()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java [106:132]


    public void produce(final ConsumerRecords<byte[], byte[]> element)
            throws InterruptedException, WakeupException, ClosedException {

        checkNotNull(element);

        synchronized (lock) {
            while (next != null && !wakeupProducer) {
                lock.wait();
            }

            wakeupProducer = false;

            // if there is still an element, we must have been woken up
            if (next != null) {
                throw new WakeupException();
            }
            // if there is no error, then this is open and can accept this element
            else if (error == null) {
                next = element;
                lock.notifyAll();
            }
            // an error marks this as closed for the producer
            else {
                throw new ClosedException();
            }
        }
    }