public WorkerThread()

in core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java [65:96]


    public WorkerThread(String threadName,
                        TopologyBuilder topologyBuilder,
                        Properties properties,
                        ScheduledExecutorService executor) throws MQClientException {
        super(threadName);

        this.topologyBuilder = topologyBuilder;
        this.properties = properties;
        jobId = topologyBuilder.getJobId();
        this.executor = executor;

        String groupName = String.join("_", jobId, ROCKETMQ_STREAMS_CONSUMER_GROUP);

        RocketMQClient rocketMQClient = new RocketMQClient(properties.getProperty(MixAll.NAMESRV_ADDR_PROPERTY));

        Set<String> topicNames = topologyBuilder.getSourceTopic();


        DefaultLitePullConsumer unionConsumer = rocketMQClient.pullConsumer(groupName, topicNames);

        MessageQueueListener originListener = unionConsumer.getMessageQueueListener();
        MessageQueueListenerWrapper wrapper = new MessageQueueListenerWrapper(originListener, topologyBuilder);
        unionConsumer.setMessageQueueListener(wrapper);

        DefaultMQProducer producer = rocketMQClient.producer(groupName);
        DefaultMQAdminExt mqAdmin = rocketMQClient.getMQAdmin();

        RocksDBStore rocksDBStore = new RocksDBStore(threadName);
        RocketMQStore store = new RocketMQStore(producer, rocksDBStore, mqAdmin, this.properties);

        this.planetaryEngine = new PlanetaryEngine<>(unionConsumer, producer, store, mqAdmin, wrapper);
    }