in bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java [133:237]
public static LedgerManagerFactory newLedgerManagerFactory(
final AbstractConfiguration<?> conf, LayoutManager layoutManager)
throws IOException, InterruptedException {
String metadataServiceUriStr;
try {
metadataServiceUriStr = conf.getMetadataServiceUri();
} catch (ConfigurationException e) {
log.error("Failed to retrieve metadata service uri from configuration", e);
throw new IOException(
"Failed to retrieve metadata service uri from configuration", e);
}
Class<? extends LedgerManagerFactory> factoryClass;
String ledgerRootPath;
// `metadataServiceUri` can be null when constructing bookkeeper client using an external zookeeper client.
if (null == metadataServiceUriStr) { //
try {
factoryClass = conf.getLedgerManagerFactoryClass();
} catch (ConfigurationException e) {
log.error("Failed to get ledger manager factory class when using an external zookeeper client", e);
throw new IOException(
"Failed to get ledger manager factory class when using an external zookeeper client", e);
}
ledgerRootPath = conf.getZkLedgersRootPath();
} else {
URI metadataServiceUri = URI.create(metadataServiceUriStr);
factoryClass = ZKMetadataDriverBase.resolveLedgerManagerFactory(metadataServiceUri);
ledgerRootPath = metadataServiceUri.getPath();
}
if (null == ledgerRootPath || ledgerRootPath.length() == 0) {
throw new IOException("Empty Ledger Root Path.");
}
// if layoutManager is null, return the default ledger manager
if (layoutManager == null) {
return new FlatLedgerManagerFactory()
.initialize(conf, null, FlatLedgerManagerFactory.CUR_VERSION);
}
LedgerManagerFactory lmFactory;
// check that the configured ledger manager is
// compatible with the existing layout
LedgerLayout layout = layoutManager.readLedgerLayout();
if (layout == null) { // no existing layout
lmFactory = createNewLMFactory(conf, layoutManager, factoryClass);
return lmFactory
.initialize(conf, layoutManager, lmFactory.getCurrentVersion());
}
if (log.isDebugEnabled()) {
log.debug("read ledger layout {}", layout);
}
// there is existing layout, we need to look into the layout.
// handle pre V2 layout
if (layout.getLayoutFormatVersion() <= V1) {
// pre V2 layout we use type of ledger manager
String lmType = conf.getLedgerManagerType();
if (lmType != null && !layout.getManagerFactoryClass().equals(lmType)) {
throw new IOException("Configured layout " + lmType
+ " does not match existing layout " + layout.getManagerFactoryClass());
}
// create the ledger manager
if (FlatLedgerManagerFactory.NAME.equals(layout.getManagerFactoryClass())) {
lmFactory = new FlatLedgerManagerFactory();
} else if (HierarchicalLedgerManagerFactory.NAME.equals(layout.getManagerFactoryClass())) {
lmFactory = new HierarchicalLedgerManagerFactory();
} else {
throw new IOException("Unknown ledger manager type: " + lmType);
}
return lmFactory.initialize(conf, layoutManager, layout.getManagerVersion());
}
// handle V2 layout case
if (factoryClass != null
&& !isSameLedgerManagerFactory(
conf, layout.getManagerFactoryClass(), factoryClass.getName())
&& conf.getProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK) == null) {
// Disable should ONLY happen during compatibility testing.
throw new IOException("Configured layout " + factoryClass.getName()
+ " does not match existing layout " + layout.getManagerFactoryClass());
}
if (factoryClass == null) {
// no factory specified in configuration
try {
Class<?> theCls = Class.forName(layout.getManagerFactoryClass());
if (!LedgerManagerFactory.class.isAssignableFrom(theCls)) {
throw new IOException("Wrong ledger manager factory " + layout.getManagerFactoryClass());
}
factoryClass = theCls.asSubclass(LedgerManagerFactory.class);
} catch (ClassNotFoundException | IOException e) {
factoryClass = attemptToResolveShadedLedgerManagerFactory(
conf,
layout.getManagerFactoryClass(),
e);
}
}
// instantiate a factory
lmFactory = ReflectionUtils.newInstance(factoryClass);
return lmFactory.initialize(conf, layoutManager, layout.getManagerVersion());
}