in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [427:449]
private static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor, long timestamp) throws
ManagedLedgerException,
InterruptedException {
return (PositionImpl) readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new Predicate<Entry>() {
@Override
public boolean apply(Entry entry) {
MessageImpl msg = null;
try {
msg = MessageImpl.deserialize(entry.getDataBuffer());
return msg.getPublishTime() <= timestamp;
} catch (Exception e) {
log.error(e, "Failed To deserialize message when finding position with error: %s", e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}
});
}