private void scan()

in broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java [217:334]


    private void scan() {
        long startTime = System.currentTimeMillis();
        AtomicInteger count = new AtomicInteger(0);
        int countCk = 0;
        Iterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
            PopCheckPointWrapper pointWrapper = entry.getValue();

            // just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
            if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
                || isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
                if (brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper);
                }
                iterator.remove();
                counter.decrementAndGet();
                continue;
            }

            PopCheckPoint point = pointWrapper.getCk();
            long now = System.currentTimeMillis();

            boolean removeCk = !this.serving;
            // ck will be timeout
            if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
                removeCk = true;
            }

            // the time stayed is too long
            if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
                removeCk = true;
            }

            if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {
                POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);
            }

            // double check
            if (isCkDone(pointWrapper)) {
                continue;
            } else if (pointWrapper.isJustOffset()) {
                // just offset should be in store.
                if (pointWrapper.getReviveQueueOffset() < 0) {
                    putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
                    countCk++;
                }
                continue;
            } else if (removeCk) {
                // put buffer ak to store
                if (pointWrapper.getReviveQueueOffset() < 0) {
                    putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
                    countCk++;
                }

                if (!pointWrapper.isCkStored()) {
                    continue;
                }

                if (brokerController.getBrokerConfig().isEnablePopBatchAck()) {
                    List<Byte> indexList = this.batchAckIndexList;
                    try {
                        for (byte i = 0; i < point.getNum(); i++) {
                            // reput buffer ak to store
                            if (DataConverter.getBit(pointWrapper.getBits().get(), i)
                                && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
                                indexList.add(i);
                            }
                        }
                        if (indexList.size() > 0) {
                            putBatchAckToStore(pointWrapper, indexList, count);
                        }
                    } finally {
                        indexList.clear();
                    }
                } else {
                    for (byte i = 0; i < point.getNum(); i++) {
                        // reput buffer ak to store
                        if (DataConverter.getBit(pointWrapper.getBits().get(), i)
                            && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
                            putAckToStore(pointWrapper, i, count);
                        }
                    }
                }

                if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
                    if (brokerController.getBrokerConfig().isEnablePopLog()) {
                        POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);
                    }
                    iterator.remove();
                    counter.decrementAndGet();
                }
            }
        }

        int offsetBufferSize = scanCommitOffset();

        long eclipse = System.currentTimeMillis() - startTime;
        if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {
            POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " +
                    "PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",
                eclipse, count.get(), countCk, counter.get(), offsetBufferSize);
            this.serving = false;
        } else {
            if (scanTimes % countOfSecond1 == 0) {
                POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +
                        "PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",
                    eclipse, count.get(), countCk, counter.get(), offsetBufferSize);
            }
        }
        PopMetricsManager.recordPopBufferScanTimeConsume(eclipse);
        scanTimes++;

        if (scanTimes >= countOfMinute1) {
            counter.set(this.buffer.size());
            scanTimes = 0;
        }
    }