in src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java [470:530]
private int moveStartOffset(List<QueueOffsetInfo> queueOffsets, MessageQueryByPage query) {
int size = queueOffsets.size();
int next = 0;
long offset = query.getPageNum() * query.getPageSize();
if (offset == 0) {
return next;
}
// sort by queueOffset size
List<QueueOffsetInfo> orderQueue = queueOffsets
.stream()
.sorted((o1, o2) -> {
long size1 = o1.getEnd() - o1.getStart();
long size2 = o2.getEnd() - o2.getStart();
if (size1 < size2) {
return -1;
} else if (size1 > size2) {
return 1;
}
return 0;
}).collect(Collectors.toList());
// Take the smallest one each time
for (int i = 0; i < size && offset >= (size - i); i++) {
long minSize = orderQueue.get(i).getEnd() - orderQueue.get(i).getStartOffset();
if (minSize == 0) {
continue;
}
long reduce = minSize * (size - i);
if (reduce <= offset) {
offset -= reduce;
for (int j = i; j < size; j++) {
orderQueue.get(j).incStartOffset(minSize);
}
} else {
long addOffset = offset / (size - i);
offset -= addOffset * (size - i);
if (addOffset != 0) {
for (int j = i; j < size; j++) {
orderQueue.get(j).incStartOffset(addOffset);
}
}
}
}
for (QueueOffsetInfo info : orderQueue) {
QueueOffsetInfo queueOffsetInfo = queueOffsets.get(info.getIdx());
queueOffsetInfo.setStartOffset(info.getStartOffset());
queueOffsetInfo.setEndOffset(info.getEndOffset());
}
for (QueueOffsetInfo info : queueOffsets) {
if (offset == 0) {
break;
}
next = (next + 1) % size;
if (info.getStartOffset() < info.getEnd()) {
info.incStartOffset();
--offset;
}
}
return next;
}