public void open()

in flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java [122:151]


    public void open(Configuration config) throws Exception {
        super.open(config);
        // Create a Connection
        connection = connectionFactory.createConnection();
        connection.start();

        exceptionListener = new AMQExceptionListener(LOG, logFailuresOnly);
        connection.setExceptionListener(exceptionListener);

        RuntimeContext runtimeContext = getRuntimeContext();
        int acknowledgeType;
        if (runtimeContext instanceof StreamingRuntimeContext
            && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
            autoAck = false;
            acknowledgeType = ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
        } else {
            autoAck = true;
            acknowledgeType = ActiveMQSession.AUTO_ACKNOWLEDGE;
        }
        // Create a Session
        session = connection.createSession(false, acknowledgeType);

        // Create the destination (Topic or Queue)
        Destination destination = AMQUtil.getDestination(session, destinationType, destinationName);

        // Create a MessageConsumer from the Session to the Topic or
        // Queue
        consumer = session.createConsumer(destination);
        runningChecker.setIsRunning(true);
    }