public void open()

in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [237:274]


    public void open(Configuration config) throws Exception {
        super.open(config);
        try {
            connection = setupConnection();
            channel = setupChannel(connection);
            if (channel == null) {
                throw new RuntimeException("None of RabbitMQ channels are available");
            }
            setupQueue();
            consumer = new QueueingConsumer(channel);

            RuntimeContext runtimeContext = getRuntimeContext();
            if (runtimeContext instanceof StreamingRuntimeContext
                    && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
                autoAck = false;
                // enables transaction mode
                channel.txSelect();
            } else {
                autoAck = true;
            }

            LOG.debug("Starting RabbitMQ source with autoAck status: " + autoAck);
            channel.basicConsume(queueName, autoAck, consumer);

        } catch (IOException e) {
            IOUtils.closeAllQuietly(channel, connection);
            throw new RuntimeException(
                    "Cannot create RMQ connection with "
                            + queueName
                            + " at "
                            + rmqConnectionConfig.getHost(),
                    e);
        }
        this.deliveryDeserializer.open(
                RuntimeContextInitializationContextAdapters.deserializationAdapter(
                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
        running = true;
    }