in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [342:424]
public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
TupleDomain<ColumnHandle> tupleDomain,
ManagedLedgerFactory managedLedgerFactory,
String topicNamePersistenceEncoding,
long totalNumEntries) throws
ManagedLedgerException, InterruptedException {
ReadOnlyCursor readOnlyCursor = null;
try {
readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
topicNamePersistenceEncoding,
PositionImpl.earliest, new ManagedLedgerConfig());
if (tupleDomain.getDomains().isPresent()) {
Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME
.getColumnHandle(connectorId, false));
if (domain != null) {
// TODO support arbitrary number of ranges
// only worry about one range for now
if (domain.getValues().getRanges().getRangeCount() == 1) {
checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
Long upperBoundTs = null;
Long lowerBoundTs = null;
Range range = domain.getValues().getRanges().getOrderedRanges().get(0);
if (!range.getHigh().isUpperUnbounded()) {
upperBoundTs = new Timestamp(range.getHigh().getValueBlock().get()
.getLong(0, 0)).getTime();
}
if (!range.getLow().isLowerUnbounded()) {
lowerBoundTs = new Timestamp(range.getLow().getValueBlock().get()
.getLong(0, 0)).getTime();
}
PositionImpl overallStartPos;
if (lowerBoundTs == null) {
overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
} else {
overallStartPos = findPosition(readOnlyCursor, lowerBoundTs);
if (overallStartPos == null) {
overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
}
}
PositionImpl overallEndPos;
if (upperBoundTs == null) {
readOnlyCursor.skipEntries(Math.toIntExact(totalNumEntries));
overallEndPos = (PositionImpl) readOnlyCursor.getReadPosition();
} else {
overallEndPos = findPosition(readOnlyCursor, upperBoundTs);
if (overallEndPos == null) {
overallEndPos = overallStartPos;
}
}
// Just use a close bound since presto can always filter out the extra entries even if
// the bound
// should be open or a mixture of open and closed
com.google.common.collect.Range<PositionImpl> posRange =
com.google.common.collect.Range.range(overallStartPos,
com.google.common.collect.BoundType.CLOSED,
overallEndPos, com.google.common.collect.BoundType.CLOSED);
long numOfEntries = readOnlyCursor.getNumberOfEntries(posRange) - 1;
PredicatePushdownInfo predicatePushdownInfo =
new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
log.debug("Predicate pushdown optimization calculated: %s", predicatePushdownInfo);
return predicatePushdownInfo;
}
}
}
} finally {
if (readOnlyCursor != null) {
readOnlyCursor.close();
}
}
return null;
}