protected void acknowledgeIDs()

in flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java [194:212]


    protected void acknowledgeIDs(long checkpointId, Set<String> UIds) {
        try {
            for (String messageId : UIds) {
                Message unacknowledgedMessage = unacknowledgedMessages.get(messageId);
                if (unacknowledgedMessage != null) {
                    unacknowledgedMessage.acknowledge();
                    unacknowledgedMessages.remove(messageId);
                } else {
                    LOG.warn("Tried to acknowledge unknown ActiveMQ message id: {}", messageId);
                }
            }
        } catch (JMSException e) {
            if (logFailuresOnly) {
                LOG.error("Failed to acknowledge ActiveMQ message");
            } else {
                throw new RuntimeException("Failed to acknowledge ActiveMQ message");
            }
        }
    }