in bookkeeper-server/src/main/java/org/apache/bookkeeper/server/EmbeddedServer.java [282:503]
public EmbeddedServer build() throws Exception {
final ComponentInfoPublisher componentInfoPublisher = new ComponentInfoPublisher();
final Supplier<BookieServiceInfo> bookieServiceInfoProvider =
() -> buildBookieServiceInfo(componentInfoPublisher);
LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack
.newBuilder()
.withComponentInfoPublisher(componentInfoPublisher)
.withName("bookie-server");
// 1. build stats provider
if (statsProvider == null) {
StatsProviderService statsProviderService = new StatsProviderService(conf);
statsProvider = statsProviderService.getStatsProvider();
serverBuilder.addComponent(statsProviderService);
log.info("Load lifecycle component : {}", statsProviderService.getName());
}
StatsLogger rootStatsLogger = statsProvider.getStatsLogger("");
// 2. Build metadata driver
if (metadataDriver == null) {
if (ledgerManagerFactory == null || registrationManager == null) {
metadataDriver = BookieResources.createMetadataDriver(conf.getServerConf(), rootStatsLogger);
serverBuilder.addComponent(new AutoCloseableLifecycleComponent("metadataDriver", metadataDriver));
}
}
if (registrationManager == null) {
registrationManager = metadataDriver.createRegistrationManager();
serverBuilder.addComponent(
new AutoCloseableLifecycleComponent("registrationManager", registrationManager));
}
// 3. Build ledger manager
if (ledgerManagerFactory == null) {
ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
serverBuilder.addComponent(new AutoCloseableLifecycleComponent("lmFactory", ledgerManagerFactory));
}
LedgerManager ledgerManager = ledgerManagerFactory.newLedgerManager();
serverBuilder.addComponent(new AutoCloseableLifecycleComponent("ledgerManager", ledgerManager));
// 4. Build bookie
StatsLogger bookieStats = rootStatsLogger.scope(BOOKIE_SCOPE);
if (diskChecker == null) {
diskChecker = BookieResources.createDiskChecker(conf.getServerConf());
}
if (ledgerDirsManager == null) {
ledgerDirsManager = BookieResources.createLedgerDirsManager(
conf.getServerConf(), diskChecker, bookieStats.scope(LD_LEDGER_SCOPE));
}
if (indexDirsManager == null) {
indexDirsManager = BookieResources.createIndexDirsManager(
conf.getServerConf(), diskChecker, bookieStats.scope(LD_INDEX_SCOPE), ledgerDirsManager);
}
ByteBufAllocatorWithOomHandler allocatorWithOomHandler;
if (allocator == null) {
allocatorWithOomHandler = BookieResources.createAllocator(conf.getServerConf());
allocator = allocatorWithOomHandler;
} else {
if (allocator instanceof ByteBufAllocatorWithOomHandler) {
allocatorWithOomHandler = (ByteBufAllocatorWithOomHandler) allocator;
} else {
allocatorWithOomHandler = new ByteBuffAllocatorWrapper(allocator);
}
}
if (uncleanShutdownDetection == null) {
uncleanShutdownDetection = new UncleanShutdownDetectionImpl(ledgerDirsManager);
}
if (uncleanShutdownDetection.lastShutdownWasUnclean()) {
log.info("Unclean shutdown detected. "
+ "The bookie did not register a graceful shutdown prior to this boot.");
}
// bookie takes ownership of storage, so shuts it down
LedgerStorage storage = null;
DataIntegrityCheck integCheck = null;
if (conf.getServerConf().isDataIntegrityCheckingEnabled()) {
StatsLogger clientStats = bookieStats.scope(CLIENT_SCOPE);
ClientConfiguration clientConfiguration = new ClientConfiguration(conf.getServerConf());
clientConfiguration.setClientRole(ClientConfiguration.CLIENT_ROLE_SYSTEM);
BookKeeper bkc = BookKeeper.forConfig(clientConfiguration).statsLogger(clientStats).build();
serverBuilder.addComponent(new AutoCloseableLifecycleComponent("bkc", bkc));
BookieId bookieId = BookieImpl.getBookieId(conf.getServerConf());
ExecutorService rxExecutor = Executors.newFixedThreadPool(
2, new ThreadFactoryBuilder().setNameFormat("rx-schedule-%d")
.setUncaughtExceptionHandler(
(t, ex) -> log.error("Uncaught exception on thread {}", t.getName(), ex))
.build());
Scheduler rxScheduler = Schedulers.from(rxExecutor);
serverBuilder.addComponent(
new RxSchedulerLifecycleComponent("rx-scheduler", conf, bookieStats,
rxScheduler, rxExecutor));
storage = BookieResources.createLedgerStorage(conf.getServerConf(), ledgerManager,
ledgerDirsManager, indexDirsManager, bookieStats, allocator);
EntryCopier copier = new EntryCopierImpl(bookieId,
((org.apache.bookkeeper.client.BookKeeper) bkc).getClientCtx().getBookieClient(),
storage, Ticker.systemTicker());
integCheck = new DataIntegrityCheckImpl(bookieId,
ledgerManager, storage, copier,
new BookKeeperAdmin(bkc, clientStats, clientConfiguration),
rxScheduler);
// if we're running with journal writes disabled and an unclean shutdown occurred then
// run the preboot check to protect against data loss and to perform data repair
if (!conf.getServerConf().getJournalWriteData()
&& uncleanShutdownDetection.lastShutdownWasUnclean()) {
integCheck.runPreBootCheck("UNCLEAN_SHUTDOWN");
}
CookieValidation cookieValidation = new DataIntegrityCookieValidation(conf.getServerConf(),
registrationManager, integCheck);
cookieValidation.checkCookies(storageDirectoriesFromConf(conf.getServerConf()));
} else {
CookieValidation cookieValidation = newLegacyCookieValidation(conf.getServerConf(),
registrationManager);
cookieValidation.checkCookies(storageDirectoriesFromConf(conf.getServerConf()));
// storage should be created after legacy validation or it will fail (it would find ledger dirs)
storage = BookieResources.createLedgerStorage(conf.getServerConf(), ledgerManager,
ledgerDirsManager, indexDirsManager, bookieStats, allocator);
}
Bookie bookie;
if (conf.getServerConf().isForceReadOnlyBookie()) {
bookie = new ReadOnlyBookie(conf.getServerConf(), registrationManager, storage,
diskChecker,
ledgerDirsManager, indexDirsManager,
bookieStats, allocator,
bookieServiceInfoProvider);
} else {
bookie = newBookieImpl(conf.getServerConf(), registrationManager, storage,
diskChecker,
ledgerDirsManager, indexDirsManager,
bookieStats, allocator,
bookieServiceInfoProvider);
}
// 5. build bookie server
BookieService bookieService =
new BookieService(conf, bookie, rootStatsLogger, allocatorWithOomHandler, uncleanShutdownDetection);
serverBuilder.addComponent(bookieService);
log.info("Load lifecycle component : {}", bookieService.getName());
if (conf.getServerConf().isLocalScrubEnabled()) {
serverBuilder.addComponent(
new ScrubberService(
rootStatsLogger.scope(ScrubberStats.SCOPE),
conf, bookieService.getServer().getBookie().getLedgerStorage()));
}
// 6. build auto recovery
AutoRecoveryService autoRecoveryService = null;
if (conf.getServerConf().isAutoRecoveryDaemonEnabled()) {
autoRecoveryService = new AutoRecoveryService(conf, rootStatsLogger.scope(REPLICATION_SCOPE));
serverBuilder.addComponent(autoRecoveryService);
log.info("Load lifecycle component : {}", autoRecoveryService.getName());
}
// 7. build data integrity check service
DataIntegrityService dataIntegrityService = null;
if (conf.getServerConf().isDataIntegrityCheckingEnabled()) {
checkNotNull(integCheck, "integCheck should have been initialized with the cookie validation");
dataIntegrityService =
new DataIntegrityService(conf, rootStatsLogger.scope(REPLICATION_SCOPE), integCheck);
serverBuilder.addComponent(dataIntegrityService);
log.info("Load lifecycle component : {}", dataIntegrityService.getName());
}
// 8. build http service
HttpService httpService = null;
if (conf.getServerConf().isHttpServerEnabled()) {
BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder()
.setBookieServer(bookieService.getServer())
.setServerConfiguration(conf.getServerConf())
.setStatsProvider(statsProvider)
.setLedgerManagerFactory(ledgerManagerFactory)
.build();
httpService = new HttpService(provider, conf, rootStatsLogger);
serverBuilder.addComponent(httpService);
log.info("Load lifecycle component : {}", httpService.getName());
}
// 9. build extra services
String[] extraComponents = conf.getServerConf().getExtraServerComponents();
if (null != extraComponents) {
try {
List<ServerLifecycleComponent> components = loadServerComponents(
extraComponents,
conf,
rootStatsLogger);
for (ServerLifecycleComponent component : components) {
serverBuilder.addComponent(component);
log.info("Load lifecycle component : {}", component.getName());
}
} catch (Exception e) {
if (conf.getServerConf().getIgnoreExtraServerComponentsStartupFailures()) {
log.info("Failed to load extra components '{}' - {}. Continuing without those components.",
StringUtils.join(extraComponents), e.getMessage());
} else {
throw e;
}
}
}
return new EmbeddedServer(serverBuilder.build(), statsProvider, registrationManager, ledgerManagerFactory,
diskChecker, ledgerDirsManager, indexDirsManager, bookieService, autoRecoveryService,
dataIntegrityService, httpService);
}