in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java [141:172]
private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
this.columnHandles = columnHandles;
this.pulsarSplit = pulsarSplit;
this.partition = TopicName.getPartitionIndex(pulsarSplit.getTableName());
this.pulsarConnectorConfig = pulsarConnectorConfig;
this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
this.messageQueue = new SpscArrayQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
this.entryQueue = new SpscArrayQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
this.topicName = TopicName.get("persistent",
NamespaceName.get(pulsarSplit.getSchemaName()),
pulsarSplit.getTableName());
this.metricsTracker = pulsarConnectorMetricsTracker;
this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
this.pulsarConnectorConfig = pulsarConnectorConfig;
this.schemaHandler = PulsarSchemaHandlers
.newPulsarSchemaHandler(this.topicName,
this.pulsarConnectorConfig, pulsarSplit.getSchemaInfo(), columnHandles);
log.info("Initializing split with parameters: %s", pulsarSplit);
try {
this.cursor = getCursor(TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()),
pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
} catch (ManagedLedgerException | InterruptedException e) {
log.error(e, "Failed to get read only cursor");
close();
throw new RuntimeException(e);
}
}