in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [380:393]
public void collect(OUT record) {
if (!customIdentifiersSet) {
boolean newMessage = setMessageIdentifiers(correlationId, deliveryTag);
if (!newMessage) {
return;
}
}
if (isEndOfStream(record)) {
this.endOfStreamSignalled = true;
return;
}
ctx.collect(record);
}