in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [323:339]
public void run(SourceContext<OUT> ctx) throws Exception {
final RMQCollectorImpl collector = new RMQCollectorImpl(ctx);
final long timeout = rmqConnectionConfig.getDeliveryTimeout();
while (running) {
Delivery delivery = consumer.nextDelivery(timeout);
synchronized (ctx.getCheckpointLock()) {
if (delivery != null) {
processMessage(delivery, collector);
}
if (collector.isEndOfStreamSignalled()) {
this.running = false;
return;
}
}
}
}