in java/core/src/main/java/com/aliyun/openservices/tablestore/agent/memory/MemoryStoreImpl.java [659:739]
public Response<Message> listMessagesPaginated(
String sessionId,
int pageSize,
Filter metadataFilter,
Long inclusiveStartCreateTime,
Long inclusiveEndCreateTime,
Order order,
String nextToken,
Integer batchSize
) {
log.info(
"list messages paginated, sessionId:{}, pageSize:{}, metadataFilter:{}, inclusiveStartCreateTime:{}, inclusiveEndCreateTime:{}, order:{}, nextToken:{}, batchSize:{}",
sessionId,
pageSize,
metadataFilter,
inclusiveStartCreateTime,
inclusiveEndCreateTime,
order,
nextToken,
batchSize
);
ValidationUtils.ensureNotNull(sessionId, "sessionId");
if (inclusiveStartCreateTime != null || inclusiveEndCreateTime != null) {
if (order == null) {
throw Exceptions.illegalArgument("order is required when inclusiveStartCreateTime or inclusiveEndCreateTime is specified");
}
} else {
order = Order.DESC;
}
if (inclusiveStartCreateTime != null && inclusiveEndCreateTime != null) {
if (Order.DESC.equals(order) && inclusiveStartCreateTime < inclusiveEndCreateTime) {
throw Exceptions.illegalArgument(
"inclusiveStartUpdateTime must be greater than inclusiveEndUpdateTime, because the results are returned in reverse order of update time"
);
}
if (Order.ASC.equals(order) && inclusiveStartCreateTime > inclusiveEndCreateTime) {
throw Exceptions.illegalArgument(
"inclusiveStartUpdateTime must be less than inclusiveEndUpdateTime, because the results are returned in order of update time"
);
}
}
PrimaryKeyValue constMin;
PrimaryKeyValue constMax;
if (Order.ASC.equals(order)) {
constMin = PrimaryKeyValue.INF_MIN;
constMax = PrimaryKeyValue.INF_MAX;
} else {
constMin = PrimaryKeyValue.INF_MAX;
constMax = PrimaryKeyValue.INF_MIN;
}
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(Message.MESSAGE_SESSION_ID, PrimaryKeyValue.fromString(sessionId))
.addPrimaryKeyColumn(Message.MESSAGE_CREATE_TIME, inclusiveStartCreateTime == null ? constMin : PrimaryKeyValue.fromLong(inclusiveStartCreateTime))
.addPrimaryKeyColumn(Message.MESSAGE_MESSAGE_ID, constMin)
.build();
if (nextToken != null) {
start = TablestoreHelper.decodeNextPrimaryKeyToken(nextToken);
}
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn(Message.MESSAGE_SESSION_ID, PrimaryKeyValue.fromString(sessionId))
.addPrimaryKeyColumn(Message.MESSAGE_CREATE_TIME, inclusiveEndCreateTime == null ? constMax : PrimaryKeyValue.fromLong(inclusiveEndCreateTime))
.addPrimaryKeyColumn(Message.MESSAGE_MESSAGE_ID, constMax)
.build();
TablestoreHelper.GetRangeIterator<Message> rangeIterator = new TablestoreHelper.GetRangeIterator<>(
client,
messageTableName,
TablestoreHelper::rowToMessage,
start,
end,
metadataFilter,
order,
(long) pageSize,
batchSize,
null
);
List<Message> messages = CollectionUtil.toList(rangeIterator);
PrimaryKey nextStartPrimaryKey = rangeIterator.nextStartPrimaryKey();
String token = nextStartPrimaryKey == null ? null : TablestoreHelper.encodeNextPrimaryKeyToken(nextStartPrimaryKey);
return new Response<>(messages, token);
}