Collection getSplitsPartitionedTopic()

in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [149:182]


    Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
            tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain,
              OffloadPolicies offloadPolicies) throws Exception {

        List<Integer> predicatedPartitions = getPredicatedPartitions(topicName, tupleDomain);
        if (log.isDebugEnabled()) {
            log.debug("Partition filter result %s", predicatedPartitions);
        }

        int actualNumSplits = Math.max(predicatedPartitions.size(), numSplits);

        int splitsPerPartition = actualNumSplits / predicatedPartitions.size();

        int splitRemainder = actualNumSplits % predicatedPartitions.size();

        ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
                .getManagedLedgerFactory();

        List<PulsarSplit> splits = new LinkedList<>();
        for (int i = 0; i < predicatedPartitions.size(); i++) {
            int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
            splits.addAll(
                getSplitsForTopic(
                    topicName.getPartition(predicatedPartitions.get(i)).getPersistenceNamingEncoding(),
                    managedLedgerFactory,
                    splitsForThisPartition,
                    tableHandle,
                    schemaInfo,
                    topicName.getPartition(predicatedPartitions.get(i)).getLocalName(),
                    tupleDomain,
                    offloadPolicies));
        }
        return splits;
    }