in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [402:429]
public boolean setMessageIdentifiers(String correlationId, long deliveryTag) {
if (customIdentifiersSet) {
throw new IllegalStateException(
"You can set only a single set of identifiers for a block of messages.");
}
this.customIdentifiersSet = true;
if (!autoAck) {
if (usesCorrelationId) {
Preconditions.checkNotNull(
correlationId,
"RabbitMQ source was instantiated with usesCorrelationId set to "
+ "true yet we couldn't extract the correlation id from it!");
if (!addId(correlationId)) {
// we have already processed this message
try {
channel.basicReject(deliveryTag, false);
} catch (IOException e) {
throw new RuntimeException(
"Message could not be acknowledged with basicReject.", e);
}
return false;
}
}
sessionIds.add(deliveryTag);
}
return true;
}