in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [184:229]
private List<Integer> getPredicatedPartitions(TopicName topicName, TupleDomain<ColumnHandle> tupleDomain) {
int numPartitions;
try {
numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
}
throw new RuntimeException("Failed to get metadata for partitioned topic "
+ topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
List<Integer> predicatePartitions = new ArrayList<>();
if (tupleDomain.getDomains().isPresent()) {
Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PARTITION
.getColumnHandle(connectorId, false));
if (domain != null) {
domain.getValues().getValuesProcessor().consume(
ranges -> domain.getValues().getRanges().getOrderedRanges().forEach(range -> {
Integer low = 0;
Integer high = numPartitions;
if (!range.getLow().isLowerUnbounded() && range.getLow().getValueBlock().isPresent()) {
low = range.getLow().getValueBlock().get().getInt(0, 0);
}
if (!range.getHigh().isLowerUnbounded() && range.getHigh().getValueBlock().isPresent()) {
high = range.getHigh().getValueBlock().get().getInt(0, 0);
}
for (int i = low; i <= high; i++) {
predicatePartitions.add(i);
}
}),
discreteValues -> {},
allOrNone -> {});
} else {
for (int i = 0; i < numPartitions; i++) {
predicatePartitions.add(i);
}
}
} else {
for (int i = 0; i < numPartitions; i++) {
predicatePartitions.add(i);
}
}
return predicatePartitions;
}