in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java [150:182]
private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies,
PulsarConnectorConfig pulsarConnectorConfig) {
try {
if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());
this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(),
pulsarConnectorConfig.getNarExtractionDirectory());
LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(offloadPolicies));
} catch (IOException ioe) {
log.error("Failed to create offloader: ", ioe);
throw new RuntimeException(ioe.getMessage(), ioe.getCause());
}
} else {
log.info("No ledger offloader configured, using NULL instance");
return NullLedgerOffloader.INSTANCE;
}
} catch (Throwable t) {
throw new RuntimeException(t);
}
}