in flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java [154:191]
public void close() throws Exception {
super.close();
RuntimeException exception = null;
try {
consumer.close();
} catch (JMSException e) {
if (logFailuresOnly) {
LOG.error("Failed to close ActiveMQ session", e);
} else {
exception = new RuntimeException("Failed to close ActiveMQ consumer", e);
}
}
try {
session.close();
} catch (JMSException e) {
if (logFailuresOnly) {
LOG.error("Failed to close ActiveMQ session", e);
} else {
exception = exception == null ? new RuntimeException("Failed to close ActiveMQ session", e)
: exception;
}
}
try {
connection.close();
} catch (JMSException e) {
if (logFailuresOnly) {
LOG.error("Failed to close ActiveMQ session", e);
} else {
exception = exception == null ? new RuntimeException("Failed to close ActiveMQ connection", e)
: exception;
}
}
if (exception != null) {
throw exception;
}
}