private List getPredicatedPartitions()

in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [184:229]


    private List<Integer> getPredicatedPartitions(TopicName topicName, TupleDomain<ColumnHandle> tupleDomain) {
        int numPartitions;
        try {
            numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
        } catch (PulsarAdminException e) {
            if (e.getStatusCode() == 401) {
                throw new PrestoException(QUERY_REJECTED,
                    String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
            }

            throw new RuntimeException("Failed to get metadata for partitioned topic "
                + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
        }
        List<Integer> predicatePartitions = new ArrayList<>();
        if (tupleDomain.getDomains().isPresent()) {
            Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PARTITION
                .getColumnHandle(connectorId, false));
            if (domain != null) {
                domain.getValues().getValuesProcessor().consume(
                    ranges -> domain.getValues().getRanges().getOrderedRanges().forEach(range -> {
                        Integer low = 0;
                        Integer high = numPartitions;
                        if (!range.getLow().isLowerUnbounded() && range.getLow().getValueBlock().isPresent()) {
                            low = range.getLow().getValueBlock().get().getInt(0, 0);
                        }
                        if (!range.getHigh().isLowerUnbounded() && range.getHigh().getValueBlock().isPresent()) {
                            high = range.getHigh().getValueBlock().get().getInt(0, 0);
                        }
                        for (int i = low; i <= high; i++) {
                            predicatePartitions.add(i);
                        }
                    }),
                    discreteValues -> {},
                    allOrNone -> {});
            } else {
                for (int i = 0; i < numPartitions; i++) {
                    predicatePartitions.add(i);
                }
            }
        } else {
            for (int i = 0; i < numPartitions; i++) {
                predicatePartitions.add(i);
            }
        }
        return predicatePartitions;
    }