in core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java [131:157]
public PlanetaryEngine(DefaultLitePullConsumer unionConsumer, DefaultMQProducer producer, StateStore stateStore,
DefaultMQAdminExt mqAdmin, MessageQueueListenerWrapper wrapper) {
this.unionConsumer = unionConsumer;
this.producer = producer;
this.mqAdmin = mqAdmin;
this.stateStore = stateStore;
this.wrapper = wrapper;
this.wrapper.setRecoverHandler((addQueue, removeQueue) -> {
try {
PlanetaryEngine.this.stateStore.recover(addQueue, removeQueue);
return null;
} catch (Throwable e) {
logger.error("recover error.", e);
return e;
}
});
Integer idleTime = (Integer) WorkerThread.this.properties.getOrDefault(StreamConfig.IDLE_TIME_TO_FIRE_WINDOW, 2000);
int commitInterval = (Integer) WorkerThread.this.properties.getOrDefault(StreamConfig.COMMIT_STATE_INTERNAL_MS, 2 * 1000);
this.idleWindowScaner = new IdleWindowScaner(idleTime, executor);
WorkerThread.this.executor.scheduleAtFixedRate(() -> {
try {
doCommit(mq2Commit);
} catch (Throwable t) {
logger.error("commit offset and state error.", t);
}
}, 1000, commitInterval, TimeUnit.MILLISECONDS);
}