public InnerProducerImpl()

in src/main/java/org/apache/flink/connector/rocketmq/sink/InnerProducerImpl.java [65:135]


    public InnerProducerImpl(Configuration configuration) {
        this.configuration = configuration;
        this.groupId = configuration.getString(RocketMQSinkOptions.PRODUCER_GROUP);
        this.endPoints = configuration.getString(RocketMQSinkOptions.ENDPOINTS);

        String accessKey = configuration.getString(RocketMQSinkOptions.OPTIONAL_ACCESS_KEY);
        String secretKey = configuration.getString(RocketMQSinkOptions.OPTIONAL_SECRET_KEY);

        if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
                && !StringUtils.isNullOrWhitespaceOnly(secretKey)) {
            AclClientRPCHook aclClientRpcHook =
                    new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
            producer = new TransactionMQProducer(groupId, aclClientRpcHook);
        } else {
            producer = new TransactionMQProducer(groupId);
        }

        producer.setNamesrvAddr(endPoints);
        producer.setVipChannelEnabled(false);
        producer.setInstanceName(
                String.join(
                        "#",
                        ManagementFactory.getRuntimeMXBean().getName(),
                        groupId,
                        UUID.randomUUID().toString()));

        int corePoolSize = configuration.getInteger(RocketMQSinkOptions.EXECUTOR_NUM);
        producer.setExecutorService(
                new ThreadPoolExecutor(
                        corePoolSize,
                        corePoolSize,
                        100,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(2000),
                        r -> {
                            Thread thread = new Thread(r);
                            thread.setName(groupId);
                            return thread;
                        }));

        // always response unknown result
        producer.setTransactionListener(
                new TransactionListener() {
                    @Override
                    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                        // no need execute local transaction here
                        // We will directly call the commit or rollback operation
                        return LocalTransactionState.UNKNOW;
                    }

                    @Override
                    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                        long transactionTimeout =
                                configuration.get(RocketMQSinkOptions.TRANSACTION_TIMEOUT);
                        if (System.currentTimeMillis() - msg.getBornTimestamp()
                                > transactionTimeout) {
                            LOG.info(
                                    "Exceeded the transaction maximum time, return rollback. topic={}, msgId={}",
                                    msg.getTopic(),
                                    msg.getTransactionId());
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        } else {
                            LOG.info(
                                    "Not exceeded the transaction maximum time, return unknown. topic={}, msgId={}",
                                    msg.getTopic(),
                                    msg.getTransactionId());
                            return LocalTransactionState.UNKNOW;
                        }
                    }
                });
    }