public void close()

in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [277:312]


    public void close() throws Exception {
        super.close();
        Exception exception = null;

        try {
            if (consumer != null && channel != null) {
                channel.basicCancel(consumer.getConsumerTag());
            }
        } catch (IOException e) {
            exception =
                    new RuntimeException(
                            "Error while cancelling RMQ consumer on "
                                    + queueName
                                    + " at "
                                    + rmqConnectionConfig.getHost(),
                            e);
        }

        try {
            IOUtils.closeAll(channel, connection);
        } catch (IOException e) {
            exception =
                    ExceptionUtils.firstOrSuppressed(
                            new RuntimeException(
                                    "Error while closing RMQ source with "
                                            + queueName
                                            + " at "
                                            + rmqConnectionConfig.getHost(),
                                    e),
                            exception);
        }

        if (exception != null) {
            throw exception;
        }
    }