public void run()

in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [323:339]


    public void run(SourceContext<OUT> ctx) throws Exception {
        final RMQCollectorImpl collector = new RMQCollectorImpl(ctx);
        final long timeout = rmqConnectionConfig.getDeliveryTimeout();
        while (running) {
            Delivery delivery = consumer.nextDelivery(timeout);

            synchronized (ctx.getCheckpointLock()) {
                if (delivery != null) {
                    processMessage(delivery, collector);
                }
                if (collector.isEndOfStreamSignalled()) {
                    this.running = false;
                    return;
                }
            }
        }
    }