public static PredicatePushdownInfo getPredicatePushdownInfo()

in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [342:424]


        public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
                                                                     TupleDomain<ColumnHandle> tupleDomain,
                                                                     ManagedLedgerFactory managedLedgerFactory,
                                                                     String topicNamePersistenceEncoding,
                                                                     long totalNumEntries) throws
                ManagedLedgerException, InterruptedException {

            ReadOnlyCursor readOnlyCursor = null;
            try {
                readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
                        topicNamePersistenceEncoding,
                        PositionImpl.earliest, new ManagedLedgerConfig());

                if (tupleDomain.getDomains().isPresent()) {
                    Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME
                            .getColumnHandle(connectorId, false));
                    if (domain != null) {
                        // TODO support arbitrary number of ranges
                        // only worry about one range for now
                        if (domain.getValues().getRanges().getRangeCount() == 1) {

                            checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");

                            Long upperBoundTs = null;
                            Long lowerBoundTs = null;

                            Range range = domain.getValues().getRanges().getOrderedRanges().get(0);

                            if (!range.getHigh().isUpperUnbounded()) {
                                upperBoundTs = new Timestamp(range.getHigh().getValueBlock().get()
                                        .getLong(0, 0)).getTime();
                            }

                            if (!range.getLow().isLowerUnbounded()) {
                                lowerBoundTs = new Timestamp(range.getLow().getValueBlock().get()
                                        .getLong(0, 0)).getTime();
                            }

                            PositionImpl overallStartPos;
                            if (lowerBoundTs == null) {
                                overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
                            } else {
                                overallStartPos = findPosition(readOnlyCursor, lowerBoundTs);
                                if (overallStartPos == null) {
                                    overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
                                }
                            }

                            PositionImpl overallEndPos;
                            if (upperBoundTs == null) {
                                readOnlyCursor.skipEntries(Math.toIntExact(totalNumEntries));
                                overallEndPos = (PositionImpl) readOnlyCursor.getReadPosition();
                            } else {
                                overallEndPos = findPosition(readOnlyCursor, upperBoundTs);
                                if (overallEndPos == null) {
                                    overallEndPos = overallStartPos;
                                }
                            }

                            // Just use a close bound since presto can always filter out the extra entries even if
                            // the bound
                            // should be open or a mixture of open and closed
                            com.google.common.collect.Range<PositionImpl> posRange =
                                com.google.common.collect.Range.range(overallStartPos,
                                    com.google.common.collect.BoundType.CLOSED,
                                    overallEndPos, com.google.common.collect.BoundType.CLOSED);

                            long numOfEntries = readOnlyCursor.getNumberOfEntries(posRange) - 1;

                            PredicatePushdownInfo predicatePushdownInfo =
                                new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
                            log.debug("Predicate pushdown optimization calculated: %s", predicatePushdownInfo);
                            return predicatePushdownInfo;
                        }
                    }
                }
            } finally {
                if (readOnlyCursor != null) {
                    readOnlyCursor.close();
                }
            }
            return null;
        }