in src/main/java/org/apache/flink/connector/rocketmq/sink/InnerProducerImpl.java [65:135]
public InnerProducerImpl(Configuration configuration) {
this.configuration = configuration;
this.groupId = configuration.getString(RocketMQSinkOptions.PRODUCER_GROUP);
this.endPoints = configuration.getString(RocketMQSinkOptions.ENDPOINTS);
String accessKey = configuration.getString(RocketMQSinkOptions.OPTIONAL_ACCESS_KEY);
String secretKey = configuration.getString(RocketMQSinkOptions.OPTIONAL_SECRET_KEY);
if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
&& !StringUtils.isNullOrWhitespaceOnly(secretKey)) {
AclClientRPCHook aclClientRpcHook =
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
producer = new TransactionMQProducer(groupId, aclClientRpcHook);
} else {
producer = new TransactionMQProducer(groupId);
}
producer.setNamesrvAddr(endPoints);
producer.setVipChannelEnabled(false);
producer.setInstanceName(
String.join(
"#",
ManagementFactory.getRuntimeMXBean().getName(),
groupId,
UUID.randomUUID().toString()));
int corePoolSize = configuration.getInteger(RocketMQSinkOptions.EXECUTOR_NUM);
producer.setExecutorService(
new ThreadPoolExecutor(
corePoolSize,
corePoolSize,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName(groupId);
return thread;
}));
// always response unknown result
producer.setTransactionListener(
new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// no need execute local transaction here
// We will directly call the commit or rollback operation
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
long transactionTimeout =
configuration.get(RocketMQSinkOptions.TRANSACTION_TIMEOUT);
if (System.currentTimeMillis() - msg.getBornTimestamp()
> transactionTimeout) {
LOG.info(
"Exceeded the transaction maximum time, return rollback. topic={}, msgId={}",
msg.getTopic(),
msg.getTransactionId());
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
LOG.info(
"Not exceeded the transaction maximum time, return unknown. topic={}, msgId={}",
msg.getTopic(),
msg.getTransactionId());
return LocalTransactionState.UNKNOW;
}
}
});
}