public void invoke()

in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java [110:181]


    public void invoke(Message input, Context context) throws Exception {
        sinkInTps.markEvent();

        if (batchFlushOnCheckpoint) {
            synchronized (batchList) {
                batchList.add(input);
            }
            if (batchList.size() >= batchSize) {
                flushSync();
            }
            return;
        }

        long timeStartWriting = System.currentTimeMillis();
        if (async) {
            try {
                SendCallback sendCallback =
                        new SendCallback() {
                            @Override
                            public void onSuccess(SendResult sendResult) {
                                LOG.debug("Async send message success! result: {}", sendResult);
                                long end = System.currentTimeMillis();
                                latencyGauge.report(end - timeStartWriting, 1);
                                outTps.markEvent();
                                outBps.markEvent(input.getBody().length);
                            }

                            @Override
                            public void onException(Throwable throwable) {
                                if (throwable != null) {
                                    LOG.error("Async send message failure!", throwable);
                                }
                            }
                        };
                if (messageQueueSelector != null) {
                    Object arg =
                            StringUtils.isNullOrWhitespaceOnly(messageQueueSelectorArg)
                                    ? null
                                    : input.getProperty(messageQueueSelectorArg);
                    producer.send(input, messageQueueSelector, arg, sendCallback);
                } else {
                    producer.send(input, sendCallback);
                }
            } catch (Exception e) {
                LOG.error("Async send message failure!", e);
            }
        } else {
            try {
                SendResult result;
                if (messageQueueSelector != null) {
                    Object arg =
                            StringUtils.isNullOrWhitespaceOnly(messageQueueSelectorArg)
                                    ? null
                                    : input.getProperty(messageQueueSelectorArg);
                    result = producer.send(input, messageQueueSelector, arg);
                } else {
                    result = producer.send(input);
                }
                LOG.debug("Sync send message result: {}", result);
                if (result.getSendStatus() != SendStatus.SEND_OK) {
                    throw new RemotingException(result.toString());
                }
                long end = System.currentTimeMillis();
                latencyGauge.report(end - timeStartWriting, 1);
                outTps.markEvent();
                outBps.markEvent(input.getBody().length);
            } catch (Exception e) {
                LOG.error("Sync send message exception: ", e);
                throw e;
            }
        }
    }