in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java [250:328]
Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
ManagedLedgerFactory managedLedgerFactory,
int numSplits,
PulsarTableHandle tableHandle,
SchemaInfo schemaInfo, String tableName,
TupleDomain<ColumnHandle> tupleDomain,
OffloadPolicies offloadPolicies)
throws ManagedLedgerException, InterruptedException, IOException {
ReadOnlyCursor readOnlyCursor = null;
try {
readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
topicNamePersistenceEncoding,
PositionImpl.earliest, new ManagedLedgerConfig());
long numEntries = readOnlyCursor.getNumberOfEntries();
if (numEntries <= 0) {
return Collections.EMPTY_LIST;
}
PredicatePushdownInfo predicatePushdownInfo = PredicatePushdownInfo.getPredicatePushdownInfo(
this.connectorId,
tupleDomain,
managedLedgerFactory,
topicNamePersistenceEncoding,
numEntries);
PositionImpl initialStartPosition;
if (predicatePushdownInfo != null) {
numEntries = predicatePushdownInfo.getNumOfEntries();
initialStartPosition = predicatePushdownInfo.getStartPosition();
} else {
initialStartPosition = (PositionImpl) readOnlyCursor.getReadPosition();
}
readOnlyCursor.close();
readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
topicNamePersistenceEncoding,
initialStartPosition, new ManagedLedgerConfig());
long remainder = numEntries % numSplits;
long avgEntriesPerSplit = numEntries / numSplits;
List<PulsarSplit> splits = new LinkedList<>();
for (int i = 0; i < numSplits; i++) {
long entriesForSplit = (remainder > i) ? avgEntriesPerSplit + 1 : avgEntriesPerSplit;
PositionImpl startPosition = (PositionImpl) readOnlyCursor.getReadPosition();
readOnlyCursor.skipEntries(Math.toIntExact(entriesForSplit));
PositionImpl endPosition = (PositionImpl) readOnlyCursor.getReadPosition();
PulsarSplit pulsarSplit = new PulsarSplit(i, this.connectorId,
restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig),
schemaInfo.getName(),
tableName,
entriesForSplit,
new String(schemaInfo.getSchema(), "ISO8859-1"),
schemaInfo.getType(),
startPosition.getEntryId(),
endPosition.getEntryId(),
startPosition.getLedgerId(),
endPosition.getLedgerId(),
tupleDomain,
objectMapper.writeValueAsString(schemaInfo.getProperties()),
offloadPolicies);
splits.add(pulsarSplit);
}
return splits;
} finally {
if (readOnlyCursor != null) {
try {
readOnlyCursor.close();
} catch (Exception e) {
log.error(e);
}
}
}
}