in store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java [220:399]
private long binarySearchInQueueByTime(final MappedFile mappedFile, final long timestamp,
BoundaryType boundaryType) {
if (mappedFile != null) {
long offset = 0;
int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long minPhysicOffset = this.messageStore.getMinPhyOffset();
int range = mappedFile.getFileSize();
if (mappedFile.getWrotePosition() != 0 && mappedFile.getWrotePosition() != mappedFile.getFileSize()) {
// mappedFile is the last one and is currently being written.
range = mappedFile.getWrotePosition();
}
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0, range);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer();
int ceiling = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
int floor = low;
high = ceiling;
try {
// Handle the following corner cases first:
// 1. store time of (high) < timestamp
// 2. store time of (low) > timestamp
long storeTime;
long phyOffset;
int size;
// Handle case 1
byteBuffer.position(ceiling);
phyOffset = byteBuffer.getLong();
size = byteBuffer.getInt();
storeTime = messageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime < timestamp) {
switch (boundaryType) {
case LOWER:
return (mappedFile.getFileFromOffset() + ceiling + CQ_STORE_UNIT_SIZE) / CQ_STORE_UNIT_SIZE;
case UPPER:
return (mappedFile.getFileFromOffset() + ceiling) / CQ_STORE_UNIT_SIZE;
default:
log.warn("Unknown boundary type");
break;
}
}
// Handle case 2
byteBuffer.position(floor);
phyOffset = byteBuffer.getLong();
size = byteBuffer.getInt();
storeTime = messageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime > timestamp) {
switch (boundaryType) {
case LOWER:
return mappedFile.getFileFromOffset() / CQ_STORE_UNIT_SIZE;
case UPPER:
return 0;
default:
log.warn("Unknown boundary type");
break;
}
}
// Perform binary search
while (high >= low) {
midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
byteBuffer.position(midOffset);
phyOffset = byteBuffer.getLong();
size = byteBuffer.getInt();
if (phyOffset < minPhysicOffset) {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
continue;
}
storeTime = this.messageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime < 0) {
log.warn("Failed to query store timestamp for commit log offset: {}", phyOffset);
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {
high = midOffset - CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;
} else {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
}
}
if (targetOffset != -1) {
// We just found ONE matched record. These next to it might also share the same store-timestamp.
offset = targetOffset;
switch (boundaryType) {
case LOWER: {
int previousAttempt = targetOffset;
while (true) {
int attempt = previousAttempt - CQ_STORE_UNIT_SIZE;
if (attempt < floor) {
break;
}
byteBuffer.position(attempt);
long physicalOffset = byteBuffer.getLong();
int messageSize = byteBuffer.getInt();
long messageStoreTimestamp = messageStore.getCommitLog()
.pickupStoreTimestamp(physicalOffset, messageSize);
if (messageStoreTimestamp == timestamp) {
previousAttempt = attempt;
continue;
}
break;
}
offset = previousAttempt;
break;
}
case UPPER: {
int previousAttempt = targetOffset;
while (true) {
int attempt = previousAttempt + CQ_STORE_UNIT_SIZE;
if (attempt > ceiling) {
break;
}
byteBuffer.position(attempt);
long physicalOffset = byteBuffer.getLong();
int messageSize = byteBuffer.getInt();
long messageStoreTimestamp = messageStore.getCommitLog()
.pickupStoreTimestamp(physicalOffset, messageSize);
if (messageStoreTimestamp == timestamp) {
previousAttempt = attempt;
continue;
}
break;
}
offset = previousAttempt;
break;
}
default: {
log.warn("Unknown boundary type");
break;
}
}
} else {
// Given timestamp does not have any message records. But we have a range enclosing the
// timestamp.
/*
* Consider the follow case: t2 has no consume queue entry and we are searching offset of
* t2 for lower and upper boundaries.
* --------------------------
* timestamp Consume Queue
* t1 1
* t1 2
* t1 3
* t3 4
* t3 5
* --------------------------
* Now, we return 3 as upper boundary of t2 and 4 as its lower boundary. It looks
* contradictory at first sight, but it does make sense when performing range queries.
*/
switch (boundaryType) {
case LOWER: {
offset = rightOffset;
break;
}
case UPPER: {
offset = leftOffset;
break;
}
default: {
log.warn("Unknown boundary type");
break;
}
}
}
return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
} finally {
sbr.release();
}
}
}
return 0;
}