in core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java [65:96]
public WorkerThread(String threadName,
TopologyBuilder topologyBuilder,
Properties properties,
ScheduledExecutorService executor) throws MQClientException {
super(threadName);
this.topologyBuilder = topologyBuilder;
this.properties = properties;
jobId = topologyBuilder.getJobId();
this.executor = executor;
String groupName = String.join("_", jobId, ROCKETMQ_STREAMS_CONSUMER_GROUP);
RocketMQClient rocketMQClient = new RocketMQClient(properties.getProperty(MixAll.NAMESRV_ADDR_PROPERTY));
Set<String> topicNames = topologyBuilder.getSourceTopic();
DefaultLitePullConsumer unionConsumer = rocketMQClient.pullConsumer(groupName, topicNames);
MessageQueueListener originListener = unionConsumer.getMessageQueueListener();
MessageQueueListenerWrapper wrapper = new MessageQueueListenerWrapper(originListener, topologyBuilder);
unionConsumer.setMessageQueueListener(wrapper);
DefaultMQProducer producer = rocketMQClient.producer(groupName);
DefaultMQAdminExt mqAdmin = rocketMQClient.getMQAdmin();
RocksDBStore rocksDBStore = new RocksDBStore(threadName);
RocketMQStore store = new RocketMQStore(producer, rocksDBStore, mqAdmin, this.properties);
this.planetaryEngine = new PlanetaryEngine<>(unionConsumer, producer, store, mqAdmin, wrapper);
}