public void invoke()

in flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java [194:235]


    public void invoke(IN value) {
        try {
            byte[] msg = schema.serialize(value);

            if (publishOptions == null) {
                channel.basicPublish("", queueName, null, msg);
            } else {
                boolean mandatory = publishOptions.computeMandatory(value);
                boolean immediate = publishOptions.computeImmediate(value);

                Preconditions.checkState(
                        !(returnListener == null && (mandatory || immediate)),
                        "Setting mandatory and/or immediate flags to true requires a ReturnListener.");

                String rk = publishOptions.computeRoutingKey(value);
                String exchange = publishOptions.computeExchange(value);

                channel.basicPublish(
                        exchange,
                        rk,
                        mandatory,
                        immediate,
                        publishOptions.computeProperties(value),
                        msg);
            }
        } catch (IOException e) {
            if (logFailuresOnly) {
                LOG.error(
                        "Cannot send RMQ message {} at {}",
                        queueName,
                        rmqConnectionConfig.getHost(),
                        e);
            } else {
                throw new RuntimeException(
                        "Cannot send RMQ message "
                                + queueName
                                + " at "
                                + rmqConnectionConfig.getHost(),
                        e);
            }
        }
    }