in broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java [628:762]
protected void initializeBrokerScheduledTasks() {
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = TimeUnit.DAYS.toMillis(1);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
LOG.error("BrokerController: failed to record broker stats", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
LOG.error(
"BrokerController: failed to persist config file of consumerOffset", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
BrokerController.this.consumerOrderInfoManager.persist();
} catch (Throwable e) {
LOG.error(
"BrokerController: failed to persist config file of consumerFilter or consumerOrderInfo",
e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
LOG.error("BrokerController: failed to protectBroker", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
LOG.error("BrokerController: failed to print broker watermark", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.messageStore.getTimerMessageStore().getTimerMetrics()
.cleanMetrics(BrokerController.this.topicConfigManager.getTopicConfigTable().keySet());
} catch (Throwable e) {
LOG.error("BrokerController: failed to clean unused timer metrics.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
LOG.info("Dispatch task fall behind commit log {}bytes",
BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
LOG.error("Failed to print dispatchBehindBytes", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !brokerConfig.isEnableControllerMode()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (System.currentTimeMillis() - lastSyncTimeMs > 60 * 1000) {
BrokerController.this.getSlaveSynchronize().syncAll();
lastSyncTimeMs = System.currentTimeMillis();
}
//timer checkpoint, latency-sensitive, so sync it more frequently
if (messageStoreConfig.isTimerWheelEnable()) {
BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint();
}
} catch (Throwable e) {
LOG.error("Failed to sync all config for slave.", e);
}
}
}, 1000 * 10, 3 * 1000, TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
LOG.error("Failed to print diff of master and slave.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
if (this.brokerConfig.isEnableControllerMode()) {
this.updateMasterHAServerAddrPeriodically = true;
}
}