public List getBatchBlocking()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java [427:456]


    public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException {
        if (timeoutMillis == 0L) {
            // wait forever case
            return getBatchBlocking();
        } else if (timeoutMillis < 0L) {
            throw new IllegalArgumentException("invalid timeout");
        }

        final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;

        lock.lock();
        try {
            while (open && elements.isEmpty() && timeoutMillis > 0) {
                nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
                timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
            }

            if (!open) {
                throw new IllegalStateException("queue is closed");
            } else if (elements.isEmpty()) {
                return Collections.emptyList();
            } else {
                ArrayList<E> result = new ArrayList<>(elements);
                elements.clear();
                return result;
            }
        } finally {
            lock.unlock();
        }
    }