public InnerConsumerImpl()

in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [68:110]


    public InnerConsumerImpl(Configuration configuration) {
        this.configuration = configuration;
        this.commonExecutorService = buildExecutorService(configuration);

        String accessKey = configuration.getString(RocketMQSourceOptions.OPTIONAL_ACCESS_KEY);
        String secretKey = configuration.getString(RocketMQSourceOptions.OPTIONAL_SECRET_KEY);

        // Note: sync pull thread num may not enough
        if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
                && !StringUtils.isNullOrWhitespaceOnly(secretKey)) {
            AclClientRPCHook aclClientRpcHook =
                    new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
            this.adminExt = new DefaultMQAdminExt(aclClientRpcHook);
            this.consumer = new DefaultLitePullConsumer(aclClientRpcHook);
        } else {
            this.adminExt = new DefaultMQAdminExt();
            this.consumer = new DefaultLitePullConsumer();
        }

        String groupId = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
        String endPoints = configuration.getString(RocketMQSourceOptions.ENDPOINTS);

        this.consumer.setNamesrvAddr(endPoints);
        this.consumer.setConsumerGroup(groupId);
        this.consumer.setAutoCommit(false);
        this.consumer.setVipChannelEnabled(false);
        this.consumer.setInstanceName(
                String.join(
                        "#",
                        ManagementFactory.getRuntimeMXBean().getName(),
                        groupId,
                        UUID.randomUUID().toString()));

        this.adminExt.setNamesrvAddr(endPoints);
        this.adminExt.setAdminExtGroup(groupId);
        this.adminExt.setVipChannelEnabled(false);
        this.adminExt.setInstanceName(
                String.join(
                        "#",
                        ManagementFactory.getRuntimeMXBean().getName(),
                        groupId,
                        UUID.randomUUID().toString()));
    }