in flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java [122:151]
public void open(Configuration config) throws Exception {
super.open(config);
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
exceptionListener = new AMQExceptionListener(LOG, logFailuresOnly);
connection.setExceptionListener(exceptionListener);
RuntimeContext runtimeContext = getRuntimeContext();
int acknowledgeType;
if (runtimeContext instanceof StreamingRuntimeContext
&& ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
autoAck = false;
acknowledgeType = ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
} else {
autoAck = true;
acknowledgeType = ActiveMQSession.AUTO_ACKNOWLEDGE;
}
// Create a Session
session = connection.createSession(false, acknowledgeType);
// Create the destination (Topic or Queue)
Destination destination = AMQUtil.getDestination(session, destinationType, destinationName);
// Create a MessageConsumer from the Session to the Topic or
// Queue
consumer = session.createConsumer(destination);
runningChecker.setIsRunning(true);
}