public PlanetaryEngine()

in core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java [131:157]


        public PlanetaryEngine(DefaultLitePullConsumer unionConsumer, DefaultMQProducer producer, StateStore stateStore,
                               DefaultMQAdminExt mqAdmin, MessageQueueListenerWrapper wrapper) {
            this.unionConsumer = unionConsumer;
            this.producer = producer;
            this.mqAdmin = mqAdmin;
            this.stateStore = stateStore;
            this.wrapper = wrapper;
            this.wrapper.setRecoverHandler((addQueue, removeQueue) -> {
                try {
                    PlanetaryEngine.this.stateStore.recover(addQueue, removeQueue);
                    return null;
                } catch (Throwable e) {
                    logger.error("recover error.", e);
                    return e;
                }
            });
            Integer idleTime = (Integer) WorkerThread.this.properties.getOrDefault(StreamConfig.IDLE_TIME_TO_FIRE_WINDOW, 2000);
            int commitInterval = (Integer) WorkerThread.this.properties.getOrDefault(StreamConfig.COMMIT_STATE_INTERNAL_MS, 2 * 1000);
            this.idleWindowScaner = new IdleWindowScaner(idleTime, executor);
            WorkerThread.this.executor.scheduleAtFixedRate(() -> {
                try {
                    doCommit(mq2Commit);
                } catch (Throwable t) {
                    logger.error("commit offset and state error.", t);
                }
            }, 1000, commitInterval, TimeUnit.MILLISECONDS);
        }