in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java [194:235]
public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
if (publishOptions == null) {
channel.basicPublish("", queueName, null, msg);
} else {
boolean mandatory = publishOptions.computeMandatory(value);
boolean immediate = publishOptions.computeImmediate(value);
Preconditions.checkState(
!(returnListener == null && (mandatory || immediate)),
"Setting mandatory and/or immediate flags to true requires a ReturnListener.");
String rk = publishOptions.computeRoutingKey(value);
String exchange = publishOptions.computeExchange(value);
channel.basicPublish(
exchange,
rk,
mandatory,
immediate,
publishOptions.computeProperties(value),
msg);
}
} catch (IOException e) {
if (logFailuresOnly) {
LOG.error(
"Cannot send RMQ message {} at {}",
queueName,
rmqConnectionConfig.getHost(),
e);
} else {
throw new RuntimeException(
"Cannot send RMQ message "
+ queueName
+ " at "
+ rmqConnectionConfig.getHost(),
e);
}
}
}