in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java [103:130]
public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
PulsarConnectorConfig pulsarConnectorConfig) {
this.splitSize = pulsarSplit.getSplitSize();
// Set start time for split
this.startTime = System.nanoTime();
PulsarConnectorCache pulsarConnectorCache;
try {
pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
} catch (Exception e) {
log.error(e, "Failed to initialize Pulsar connector cache");
close();
throw new RuntimeException(e);
}
OffloadPolicies offloadPolicies = pulsarSplit.getOffloadPolicies();
if (offloadPolicies != null) {
offloadPolicies.setOffloadersDirectory(pulsarConnectorConfig.getOffloadersDirectory());
offloadPolicies.setManagedLedgerOffloadMaxThreads(
pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads());
}
initialize(columnHandles, pulsarSplit, pulsarConnectorConfig,
pulsarConnectorCache.getManagedLedgerFactory(),
pulsarConnectorCache.getManagedLedgerConfig(
TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()),
pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies,
pulsarConnectorConfig),
new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
}