in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java [168:186]
public void open(Configuration config) throws Exception {
schema.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
try {
connection = setupConnection();
channel = connection.createChannel();
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
setupQueue();
if (returnListener != null) {
channel.addReturnListener(returnListener);
}
} catch (IOException e) {
throw new RuntimeException("Error while creating the RabbitMQ channel", e);
}
}