in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [277:312]
public void close() throws Exception {
super.close();
Exception exception = null;
try {
if (consumer != null && channel != null) {
channel.basicCancel(consumer.getConsumerTag());
}
} catch (IOException e) {
exception =
new RuntimeException(
"Error while cancelling RMQ consumer on "
+ queueName
+ " at "
+ rmqConnectionConfig.getHost(),
e);
}
try {
IOUtils.closeAll(channel, connection);
} catch (IOException e) {
exception =
ExceptionUtils.firstOrSuppressed(
new RuntimeException(
"Error while closing RMQ source with "
+ queueName
+ " at "
+ rmqConnectionConfig.getHost(),
e),
exception);
}
if (exception != null) {
throw exception;
}
}