in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [149:182]
Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain,
OffloadPolicies offloadPolicies) throws Exception {
List<Integer> predicatedPartitions = getPredicatedPartitions(topicName, tupleDomain);
if (log.isDebugEnabled()) {
log.debug("Partition filter result %s", predicatedPartitions);
}
int actualNumSplits = Math.max(predicatedPartitions.size(), numSplits);
int splitsPerPartition = actualNumSplits / predicatedPartitions.size();
int splitRemainder = actualNumSplits % predicatedPartitions.size();
ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
.getManagedLedgerFactory();
List<PulsarSplit> splits = new LinkedList<>();
for (int i = 0; i < predicatedPartitions.size(); i++) {
int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
splits.addAll(
getSplitsForTopic(
topicName.getPartition(predicatedPartitions.get(i)).getPersistenceNamingEncoding(),
managedLedgerFactory,
splitsForThisPartition,
tableHandle,
schemaInfo,
topicName.getPartition(predicatedPartitions.get(i)).getLocalName(),
tupleDomain,
offloadPolicies));
}
return splits;
}