in store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java [182:306]
public long binarySearchInCQByTime(String topic, int queueId, long high, long low, long timestamp,
long minPhysicOffset, BoundaryType boundaryType) throws RocksDBException {
long result = -1L;
long targetOffset = -1L, leftOffset = -1L, rightOffset = -1L;
long ceiling = high, floor = low;
// Handle the following corner cases first:
// 1. store time of (high) < timestamp
ByteBuffer buffer = getCQInKV(topic, queueId, ceiling);
if (buffer != null) {
long storeTime = buffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime < timestamp) {
switch (boundaryType) {
case LOWER:
return ceiling + 1;
case UPPER:
return ceiling;
default:
log.warn("Unknown boundary type");
break;
}
}
}
// 2. store time of (low) > timestamp
buffer = getCQInKV(topic, queueId, floor);
if (buffer != null) {
long storeTime = buffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime > timestamp) {
switch (boundaryType) {
case LOWER:
return floor;
case UPPER:
return 0;
default:
log.warn("Unknown boundary type");
break;
}
}
}
while (high >= low) {
long midOffset = low + ((high - low) >>> 1);
ByteBuffer byteBuffer = getCQInKV(topic, queueId, midOffset);
if (byteBuffer == null) {
ERROR_LOG.warn("binarySearchInCQByTimeStamp Failed. topic: {}, queueId: {}, timestamp: {}, result: null",
topic, queueId, timestamp);
low = midOffset + 1;
continue;
}
long phyOffset = byteBuffer.getLong(PHY_OFFSET_OFFSET);
if (phyOffset < minPhysicOffset) {
low = midOffset + 1;
leftOffset = midOffset;
continue;
}
long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {
high = midOffset - 1;
rightOffset = midOffset;
} else {
low = midOffset + 1;
leftOffset = midOffset;
}
}
if (targetOffset != -1) {
// offset next to it might also share the same store-timestamp.
switch (boundaryType) {
case LOWER: {
while (true) {
long nextOffset = targetOffset - 1;
if (nextOffset < floor) {
break;
}
ByteBuffer byteBuffer = getCQInKV(topic, queueId, nextOffset);
long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime != timestamp) {
break;
}
targetOffset = nextOffset;
}
break;
}
case UPPER: {
while (true) {
long nextOffset = targetOffset + 1;
if (nextOffset > ceiling) {
break;
}
ByteBuffer byteBuffer = getCQInKV(topic, queueId, nextOffset);
long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime != timestamp) {
break;
}
targetOffset = nextOffset;
}
break;
}
default: {
log.warn("Unknown boundary type");
break;
}
}
result = targetOffset;
} else {
switch (boundaryType) {
case LOWER: {
result = rightOffset;
break;
}
case UPPER: {
result = leftOffset;
break;
}
default: {
log.warn("Unknown boundary type");
break;
}
}
}
return result;
}