in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java [237:274]
public void open(Configuration config) throws Exception {
super.open(config);
try {
connection = setupConnection();
channel = setupChannel(connection);
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
setupQueue();
consumer = new QueueingConsumer(channel);
RuntimeContext runtimeContext = getRuntimeContext();
if (runtimeContext instanceof StreamingRuntimeContext
&& ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
autoAck = false;
// enables transaction mode
channel.txSelect();
} else {
autoAck = true;
}
LOG.debug("Starting RabbitMQ source with autoAck status: " + autoAck);
channel.basicConsume(queueName, autoAck, consumer);
} catch (IOException e) {
IOUtils.closeAllQuietly(channel, connection);
throw new RuntimeException(
"Cannot create RMQ connection with "
+ queueName
+ " at "
+ rmqConnectionConfig.getHost(),
e);
}
this.deliveryDeserializer.open(
RuntimeContextInitializationContextAdapters.deserializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
running = true;
}