in fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java [387:465]
private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
Map<Integer, List<PbFetchLogReqForBucket>> fetchLogReqForBuckets = new HashMap<>();
int readyForFetchCount = 0;
Long tableId = null;
for (TableBucket tb : fetchableBuckets()) {
if (tableId == null) {
tableId = tb.getTableId();
}
Long offset = logScannerStatus.getBucketOffset(tb);
if (offset == null) {
LOG.debug(
"Skipping fetch request for bucket {} because the bucket has been "
+ "unsubscribed.",
tb);
continue;
}
// TODO add select preferred read replica, currently we can only read from leader.
Integer leader = getTableBucketLeader(tb);
if (leader == null) {
LOG.trace(
"Skipping fetch request for bucket {} because leader is not available.",
tb);
// try to get the latest metadata info of this table because the leader for this
// bucket is unknown.
metadataUpdater.updateTableOrPartitionMetadata(tablePath, tb.getPartitionId());
} else if (nodesWithPendingFetchRequests.contains(leader)) {
LOG.trace(
"Skipping fetch request for bucket {} because previous request "
+ "to server {} has not been processed.",
tb,
leader);
} else {
PbFetchLogReqForBucket fetchLogReqForBucket =
new PbFetchLogReqForBucket()
.setBucketId(tb.getBucket())
.setFetchOffset(offset)
.setMaxFetchBytes(maxBucketFetchBytes);
if (tb.getPartitionId() != null) {
fetchLogReqForBucket.setPartitionId(tb.getPartitionId());
}
fetchLogReqForBuckets
.computeIfAbsent(leader, key -> new ArrayList<>())
.add(fetchLogReqForBucket);
readyForFetchCount++;
}
}
if (readyForFetchCount == 0) {
return Collections.emptyMap();
} else {
Map<Integer, FetchLogRequest> fetchLogRequests = new HashMap<>();
long finalTableId = tableId;
fetchLogReqForBuckets.forEach(
(leaderId, reqForBuckets) -> {
FetchLogRequest fetchLogRequest =
new FetchLogRequest()
.setFollowerServerId(-1)
.setMaxBytes(maxFetchBytes)
.setMinBytes(minFetchBytes)
.setMaxWaitMs(maxFetchWaitMs);
PbFetchLogReqForTable reqForTable =
new PbFetchLogReqForTable().setTableId(finalTableId);
if (readContext.isProjectionPushDowned()) {
assert projection != null;
reqForTable
.setProjectionPushdownEnabled(true)
.setProjectedFields(projection.getProjectionInOrder());
} else {
reqForTable.setProjectionPushdownEnabled(false);
}
reqForTable.addAllBucketsReqs(reqForBuckets);
fetchLogRequest.addAllTablesReqs(Collections.singletonList(reqForTable));
fetchLogRequests.put(leaderId, fetchLogRequest);
});
return fetchLogRequests;
}
}