in broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java [217:334]
private void scan() {
long startTime = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
int countCk = 0;
Iterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
PopCheckPointWrapper pointWrapper = entry.getValue();
// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper);
}
iterator.remove();
counter.decrementAndGet();
continue;
}
PopCheckPoint point = pointWrapper.getCk();
long now = System.currentTimeMillis();
boolean removeCk = !this.serving;
// ck will be timeout
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
removeCk = true;
}
// the time stayed is too long
if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
removeCk = true;
}
if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {
POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);
}
// double check
if (isCkDone(pointWrapper)) {
continue;
} else if (pointWrapper.isJustOffset()) {
// just offset should be in store.
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
}
continue;
} else if (removeCk) {
// put buffer ak to store
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
}
if (!pointWrapper.isCkStored()) {
continue;
}
if (brokerController.getBrokerConfig().isEnablePopBatchAck()) {
List<Byte> indexList = this.batchAckIndexList;
try {
for (byte i = 0; i < point.getNum(); i++) {
// reput buffer ak to store
if (DataConverter.getBit(pointWrapper.getBits().get(), i)
&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
indexList.add(i);
}
}
if (indexList.size() > 0) {
putBatchAckToStore(pointWrapper, indexList, count);
}
} finally {
indexList.clear();
}
} else {
for (byte i = 0; i < point.getNum(); i++) {
// reput buffer ak to store
if (DataConverter.getBit(pointWrapper.getBits().get(), i)
&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
putAckToStore(pointWrapper, i, count);
}
}
}
if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);
}
iterator.remove();
counter.decrementAndGet();
}
}
}
int offsetBufferSize = scanCommitOffset();
long eclipse = System.currentTimeMillis() - startTime;
if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {
POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " +
"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",
eclipse, count.get(), countCk, counter.get(), offsetBufferSize);
this.serving = false;
} else {
if (scanTimes % countOfSecond1 == 0) {
POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +
"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",
eclipse, count.get(), countCk, counter.get(), offsetBufferSize);
}
}
PopMetricsManager.recordPopBufferScanTimeConsume(eclipse);
scanTimes++;
if (scanTimes >= countOfMinute1) {
counter.set(this.buffer.size());
scanTimes = 0;
}
}