in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java [110:181]
public void invoke(Message input, Context context) throws Exception {
sinkInTps.markEvent();
if (batchFlushOnCheckpoint) {
synchronized (batchList) {
batchList.add(input);
}
if (batchList.size() >= batchSize) {
flushSync();
}
return;
}
long timeStartWriting = System.currentTimeMillis();
if (async) {
try {
SendCallback sendCallback =
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOG.debug("Async send message success! result: {}", sendResult);
long end = System.currentTimeMillis();
latencyGauge.report(end - timeStartWriting, 1);
outTps.markEvent();
outBps.markEvent(input.getBody().length);
}
@Override
public void onException(Throwable throwable) {
if (throwable != null) {
LOG.error("Async send message failure!", throwable);
}
}
};
if (messageQueueSelector != null) {
Object arg =
StringUtils.isNullOrWhitespaceOnly(messageQueueSelectorArg)
? null
: input.getProperty(messageQueueSelectorArg);
producer.send(input, messageQueueSelector, arg, sendCallback);
} else {
producer.send(input, sendCallback);
}
} catch (Exception e) {
LOG.error("Async send message failure!", e);
}
} else {
try {
SendResult result;
if (messageQueueSelector != null) {
Object arg =
StringUtils.isNullOrWhitespaceOnly(messageQueueSelectorArg)
? null
: input.getProperty(messageQueueSelectorArg);
result = producer.send(input, messageQueueSelector, arg);
} else {
result = producer.send(input);
}
LOG.debug("Sync send message result: {}", result);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new RemotingException(result.toString());
}
long end = System.currentTimeMillis();
latencyGauge.report(end - timeStartWriting, 1);
outTps.markEvent();
outBps.markEvent(input.getBody().length);
} catch (Exception e) {
LOG.error("Sync send message exception: ", e);
throw e;
}
}
}