private Map prepareFetchLogRequests()

in fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java [387:465]


    private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
        Map<Integer, List<PbFetchLogReqForBucket>> fetchLogReqForBuckets = new HashMap<>();
        int readyForFetchCount = 0;
        Long tableId = null;
        for (TableBucket tb : fetchableBuckets()) {
            if (tableId == null) {
                tableId = tb.getTableId();
            }
            Long offset = logScannerStatus.getBucketOffset(tb);
            if (offset == null) {
                LOG.debug(
                        "Skipping fetch request for bucket {} because the bucket has been "
                                + "unsubscribed.",
                        tb);
                continue;
            }

            // TODO add select preferred read replica, currently we can only read from leader.

            Integer leader = getTableBucketLeader(tb);
            if (leader == null) {
                LOG.trace(
                        "Skipping fetch request for bucket {} because leader is not available.",
                        tb);
                // try to get the latest metadata info of this table because the leader for this
                // bucket is unknown.
                metadataUpdater.updateTableOrPartitionMetadata(tablePath, tb.getPartitionId());
            } else if (nodesWithPendingFetchRequests.contains(leader)) {
                LOG.trace(
                        "Skipping fetch request for bucket {} because previous request "
                                + "to server {} has not been processed.",
                        tb,
                        leader);
            } else {
                PbFetchLogReqForBucket fetchLogReqForBucket =
                        new PbFetchLogReqForBucket()
                                .setBucketId(tb.getBucket())
                                .setFetchOffset(offset)
                                .setMaxFetchBytes(maxBucketFetchBytes);
                if (tb.getPartitionId() != null) {
                    fetchLogReqForBucket.setPartitionId(tb.getPartitionId());
                }
                fetchLogReqForBuckets
                        .computeIfAbsent(leader, key -> new ArrayList<>())
                        .add(fetchLogReqForBucket);
                readyForFetchCount++;
            }
        }

        if (readyForFetchCount == 0) {
            return Collections.emptyMap();
        } else {
            Map<Integer, FetchLogRequest> fetchLogRequests = new HashMap<>();
            long finalTableId = tableId;
            fetchLogReqForBuckets.forEach(
                    (leaderId, reqForBuckets) -> {
                        FetchLogRequest fetchLogRequest =
                                new FetchLogRequest()
                                        .setFollowerServerId(-1)
                                        .setMaxBytes(maxFetchBytes)
                                        .setMinBytes(minFetchBytes)
                                        .setMaxWaitMs(maxFetchWaitMs);
                        PbFetchLogReqForTable reqForTable =
                                new PbFetchLogReqForTable().setTableId(finalTableId);
                        if (readContext.isProjectionPushDowned()) {
                            assert projection != null;
                            reqForTable
                                    .setProjectionPushdownEnabled(true)
                                    .setProjectedFields(projection.getProjectionInOrder());
                        } else {
                            reqForTable.setProjectionPushdownEnabled(false);
                        }
                        reqForTable.addAllBucketsReqs(reqForBuckets);
                        fetchLogRequest.addAllTablesReqs(Collections.singletonList(reqForTable));
                        fetchLogRequests.put(leaderId, fetchLogRequest);
                    });
            return fetchLogRequests;
        }
    }