public long binarySearchInCQByTime()

in store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java [182:306]


    public long binarySearchInCQByTime(String topic, int queueId, long high, long low, long timestamp,
        long minPhysicOffset, BoundaryType boundaryType) throws RocksDBException {
        long result = -1L;
        long targetOffset = -1L, leftOffset = -1L, rightOffset = -1L;
        long ceiling = high, floor = low;
        // Handle the following corner cases first:
        // 1. store time of (high) < timestamp
        ByteBuffer buffer = getCQInKV(topic, queueId, ceiling);
        if (buffer != null) {
            long storeTime = buffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
            if (storeTime < timestamp) {
                switch (boundaryType) {
                    case LOWER:
                        return ceiling + 1;
                    case UPPER:
                        return ceiling;
                    default:
                        log.warn("Unknown boundary type");
                        break;
                }
            }
        }
        // 2. store time of (low) > timestamp
        buffer = getCQInKV(topic, queueId, floor);
        if (buffer != null) {
            long storeTime = buffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
            if (storeTime > timestamp) {
                switch (boundaryType) {
                    case LOWER:
                        return floor;
                    case UPPER:
                        return 0;
                    default:
                        log.warn("Unknown boundary type");
                        break;
                }
            }
        }
        while (high >= low) {
            long midOffset = low + ((high - low) >>> 1);
            ByteBuffer byteBuffer = getCQInKV(topic, queueId, midOffset);
            if (byteBuffer == null) {
                ERROR_LOG.warn("binarySearchInCQByTimeStamp Failed. topic: {}, queueId: {}, timestamp: {}, result: null",
                    topic, queueId, timestamp);
                low = midOffset + 1;
                continue;
            }

            long phyOffset = byteBuffer.getLong(PHY_OFFSET_OFFSET);
            if (phyOffset < minPhysicOffset) {
                low = midOffset + 1;
                leftOffset = midOffset;
                continue;
            }
            long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
            if (storeTime < 0) {
                return 0;
            } else if (storeTime == timestamp) {
                targetOffset = midOffset;
                break;
            } else if (storeTime > timestamp) {
                high = midOffset - 1;
                rightOffset = midOffset;
            } else {
                low = midOffset + 1;
                leftOffset = midOffset;
            }
        }
        if (targetOffset != -1) {
            // offset next to it might also share the same store-timestamp.
            switch (boundaryType) {
                case LOWER: {
                    while (true) {
                        long nextOffset = targetOffset - 1;
                        if (nextOffset < floor) {
                            break;
                        }
                        ByteBuffer byteBuffer = getCQInKV(topic, queueId, nextOffset);
                        long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
                        if (storeTime != timestamp) {
                            break;
                        }
                        targetOffset = nextOffset;
                    }
                    break;
                }
                case UPPER: {
                    while (true) {
                        long nextOffset = targetOffset + 1;
                        if (nextOffset > ceiling) {
                            break;
                        }
                        ByteBuffer byteBuffer = getCQInKV(topic, queueId, nextOffset);
                        long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
                        if (storeTime != timestamp) {
                            break;
                        }
                        targetOffset = nextOffset;
                    }
                    break;
                }
                default: {
                    log.warn("Unknown boundary type");
                    break;
                }
            }
            result = targetOffset;
        } else {
            switch (boundaryType) {
                case LOWER: {
                    result = rightOffset;
                    break;
                }
                case UPPER: {
                    result = leftOffset;
                    break;
                }
                default: {
                    log.warn("Unknown boundary type");
                    break;
                }
            }
        }
        return result;
    }