public boolean setMessageIdentifiers()

in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [402:429]


        public boolean setMessageIdentifiers(String correlationId, long deliveryTag) {
            if (customIdentifiersSet) {
                throw new IllegalStateException(
                        "You can set only a single set of identifiers for a block of messages.");
            }

            this.customIdentifiersSet = true;
            if (!autoAck) {
                if (usesCorrelationId) {
                    Preconditions.checkNotNull(
                            correlationId,
                            "RabbitMQ source was instantiated with usesCorrelationId set to "
                                    + "true yet we couldn't extract the correlation id from it!");
                    if (!addId(correlationId)) {
                        // we have already processed this message
                        try {
                            channel.basicReject(deliveryTag, false);
                        } catch (IOException e) {
                            throw new RuntimeException(
                                    "Message could not be acknowledged with basicReject.", e);
                        }
                        return false;
                    }
                }
                sessionIds.add(deliveryTag);
            }
            return true;
        }