private long binarySearchInQueueByTime()

in store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java [220:399]


    private long binarySearchInQueueByTime(final MappedFile mappedFile, final long timestamp,
        BoundaryType boundaryType) {
        if (mappedFile != null) {
            long offset = 0;
            int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
            int high = 0;
            int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
            long minPhysicOffset = this.messageStore.getMinPhyOffset();
            int range = mappedFile.getFileSize();
            if (mappedFile.getWrotePosition() != 0 && mappedFile.getWrotePosition() != mappedFile.getFileSize()) {
                // mappedFile is the last one and is currently being written.
                range = mappedFile.getWrotePosition();
            }
            SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0, range);
            if (null != sbr) {
                ByteBuffer byteBuffer = sbr.getByteBuffer();
                int ceiling = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
                int floor = low;
                high = ceiling;
                try {
                    // Handle the following corner cases first:
                    // 1. store time of (high) < timestamp
                    // 2. store time of (low) > timestamp
                    long storeTime;
                    long phyOffset;
                    int size;
                    // Handle case 1
                    byteBuffer.position(ceiling);
                    phyOffset = byteBuffer.getLong();
                    size = byteBuffer.getInt();
                    storeTime = messageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                    if (storeTime < timestamp) {
                        switch (boundaryType) {
                            case LOWER:
                                return (mappedFile.getFileFromOffset() + ceiling + CQ_STORE_UNIT_SIZE) / CQ_STORE_UNIT_SIZE;
                            case UPPER:
                                return (mappedFile.getFileFromOffset() + ceiling) / CQ_STORE_UNIT_SIZE;
                            default:
                                log.warn("Unknown boundary type");
                                break;
                        }
                    }

                    // Handle case 2
                    byteBuffer.position(floor);
                    phyOffset = byteBuffer.getLong();
                    size = byteBuffer.getInt();
                    storeTime = messageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                    if (storeTime > timestamp) {
                        switch (boundaryType) {
                            case LOWER:
                                return mappedFile.getFileFromOffset() / CQ_STORE_UNIT_SIZE;
                            case UPPER:
                                return 0;
                            default:
                                log.warn("Unknown boundary type");
                                break;
                        }
                    }

                    // Perform binary search
                    while (high >= low) {
                        midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                        byteBuffer.position(midOffset);
                        phyOffset = byteBuffer.getLong();
                        size = byteBuffer.getInt();
                        if (phyOffset < minPhysicOffset) {
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            continue;
                        }

                        storeTime = this.messageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                        if (storeTime < 0) {
                            log.warn("Failed to query store timestamp for commit log offset: {}", phyOffset);
                            return 0;
                        } else if (storeTime == timestamp) {
                            targetOffset = midOffset;
                            break;
                        } else if (storeTime > timestamp) {
                            high = midOffset - CQ_STORE_UNIT_SIZE;
                            rightOffset = midOffset;
                        } else {
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                        }
                    }

                    if (targetOffset != -1) {
                        // We just found ONE matched record. These next to it might also share the same store-timestamp.
                        offset = targetOffset;
                        switch (boundaryType) {
                            case LOWER: {
                                int previousAttempt = targetOffset;
                                while (true) {
                                    int attempt = previousAttempt - CQ_STORE_UNIT_SIZE;
                                    if (attempt < floor) {
                                        break;
                                    }
                                    byteBuffer.position(attempt);
                                    long physicalOffset = byteBuffer.getLong();
                                    int messageSize = byteBuffer.getInt();
                                    long messageStoreTimestamp = messageStore.getCommitLog()
                                        .pickupStoreTimestamp(physicalOffset, messageSize);
                                    if (messageStoreTimestamp == timestamp) {
                                        previousAttempt = attempt;
                                        continue;
                                    }
                                    break;
                                }
                                offset = previousAttempt;
                                break;
                            }
                            case UPPER: {
                                int previousAttempt = targetOffset;
                                while (true) {
                                    int attempt = previousAttempt + CQ_STORE_UNIT_SIZE;
                                    if (attempt > ceiling) {
                                        break;
                                    }
                                    byteBuffer.position(attempt);
                                    long physicalOffset = byteBuffer.getLong();
                                    int messageSize = byteBuffer.getInt();
                                    long messageStoreTimestamp = messageStore.getCommitLog()
                                        .pickupStoreTimestamp(physicalOffset, messageSize);
                                    if (messageStoreTimestamp == timestamp) {
                                        previousAttempt = attempt;
                                        continue;
                                    }
                                    break;
                                }
                                offset = previousAttempt;
                                break;
                            }
                            default: {
                                log.warn("Unknown boundary type");
                                break;
                            }
                        }
                    } else {
                        // Given timestamp does not have any message records. But we have a range enclosing the
                        // timestamp.
                        /*
                         * Consider the follow case: t2 has no consume queue entry and we are searching offset of
                         * t2 for lower and upper boundaries.
                         *  --------------------------
                         *   timestamp   Consume Queue
                         *       t1          1
                         *       t1          2
                         *       t1          3
                         *       t3          4
                         *       t3          5
                         *   --------------------------
                         * Now, we return 3 as upper boundary of t2 and 4 as its lower boundary. It looks
                         * contradictory at first sight, but it does make sense when performing range queries.
                         */
                        switch (boundaryType) {
                            case LOWER: {
                                offset = rightOffset;
                                break;
                            }

                            case UPPER: {
                                offset = leftOffset;
                                break;
                            }
                            default: {
                                log.warn("Unknown boundary type");
                                break;
                            }
                        }
                    }
                    return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
                } finally {
                    sbr.release();
                }
            }
        }
        return 0;
    }