in flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java [194:212]
protected void acknowledgeIDs(long checkpointId, Set<String> UIds) {
try {
for (String messageId : UIds) {
Message unacknowledgedMessage = unacknowledgedMessages.get(messageId);
if (unacknowledgedMessage != null) {
unacknowledgedMessage.acknowledge();
unacknowledgedMessages.remove(messageId);
} else {
LOG.warn("Tried to acknowledge unknown ActiveMQ message id: {}", messageId);
}
}
} catch (JMSException e) {
if (logFailuresOnly) {
LOG.error("Failed to acknowledge ActiveMQ message");
} else {
throw new RuntimeException("Failed to acknowledge ActiveMQ message");
}
}
}