in stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java [205:417]
public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
int grpcPort,
boolean useHostname,
boolean startBookieAndStartProvider,
StatsLogger externalStatsLogger)
throws Exception {
final ComponentInfoPublisher componentInfoPublisher = new ComponentInfoPublisher();
final Supplier<BookieServiceInfo> bookieServiceInfoProvider =
() -> buildBookieServiceInfo(componentInfoPublisher);
LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder()
.withName("storage-server")
.withComponentInfoPublisher(componentInfoPublisher);
BookieConfiguration bkConf = BookieConfiguration.of(conf);
bkConf.validate();
DLConfiguration dlConf = DLConfiguration.of(conf);
dlConf.validate();
StorageServerConfiguration serverConf = StorageServerConfiguration.of(conf);
serverConf.validate();
StorageConfiguration storageConf = new StorageConfiguration(conf);
storageConf.validate();
// Get my local endpoint
Endpoint myEndpoint = createLocalEndpoint(grpcPort, useHostname);
// Create shared resources
StorageResources storageResources = StorageResources.create();
// Create the stats provider
StatsLogger rootStatsLogger;
StatsProviderService statsProviderService = null;
if (startBookieAndStartProvider) {
statsProviderService = new StatsProviderService(bkConf);
rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
serverBuilder.addComponent(statsProviderService);
log.info("Bookie configuration : {}", bkConf.asJson());
} else {
rootStatsLogger = checkNotNull(externalStatsLogger,
"External stats logger is not provided while not starting stats provider");
}
// dump configurations
log.info("Dlog configuration : {}", dlConf.asJson());
log.info("Storage configuration : {}", storageConf.asJson());
log.info("Server configuration : {}", serverConf.asJson());
// Create the bookie service
ServerConfiguration bkServerConf;
if (startBookieAndStartProvider) {
BookieService bookieService = new BookieService(bkConf, rootStatsLogger, bookieServiceInfoProvider);
serverBuilder.addComponent(bookieService);
bkServerConf = bookieService.serverConf();
// Build http service
if (bkServerConf.isHttpServerEnabled()) {
MetadataBookieDriver metadataDriver = BookieResources.createMetadataDriver(bkServerConf,
rootStatsLogger);
serverBuilder.addComponent(new AutoCloseableLifecycleComponent("metadataDriver",
metadataDriver));
LedgerManagerFactory ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
serverBuilder.addComponent(new AutoCloseableLifecycleComponent("lmFactory",
ledgerManagerFactory));
BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder()
.setBookieServer(bookieService.getServer())
.setServerConfiguration(bkServerConf)
.setStatsProvider(statsProviderService.getStatsProvider())
.setLedgerManagerFactory(ledgerManagerFactory)
.build();
HttpService httpService =
new HttpService(provider,
new org.apache.bookkeeper.server.conf.BookieConfiguration(bkServerConf),
rootStatsLogger);
serverBuilder.addComponent(httpService);
log.info("Load lifecycle component : {}", HttpService.class.getName());
}
} else {
bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(bkConf.getUnderlyingConf());
}
// Create the bookie watch service
BookieWatchService bkWatchService;
{
DistributedLogConfiguration dlogConf = new DistributedLogConfiguration();
dlogConf.loadConf(dlConf);
bkWatchService = new BookieWatchService(
dlogConf.getEnsembleSize(),
bkConf,
NullStatsLogger.INSTANCE);
}
// Create the curator provider service
CuratorProviderService curatorProviderService = new CuratorProviderService(
bkServerConf, dlConf, rootStatsLogger.scope("curator"));
// Create the distributedlog namespace service
DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(
bkServerConf,
dlConf,
rootStatsLogger.scope("dlog"));
// client settings for the proxy channels
StorageClientSettings proxyClientSettings = StorageClientSettings.newBuilder()
.serviceUri("bk://localhost:" + grpcPort)
.build();
// Create range (stream) store
StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder()
.withStatsLogger(rootStatsLogger.scope("storage"))
.withStorageConfiguration(storageConf)
// the storage resources shared across multiple components
.withStorageResources(storageResources)
// the placement policy
.withStorageContainerPlacementPolicyFactory(() -> {
long numStorageContainers;
try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(
curatorProviderService.get(),
ZKMetadataDriverBase.resolveZkServers(bkServerConf),
ZK_METADATA_ROOT_PATH)) {
numStorageContainers = store.getClusterMetadata().getNumStorageContainers();
}
return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers);
})
// the default log backend uri
.withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
// with zk-based storage container manager
.withStorageContainerManagerFactory((storeConf, registry) ->
new ZkStorageContainerManager(
myEndpoint,
storageConf,
new ZkClusterMetadataStore(
curatorProviderService.get(),
ZKMetadataDriverBase.resolveZkServers(bkServerConf),
ZK_METADATA_ROOT_PATH),
registry,
rootStatsLogger.scope("sc").scope("manager")))
// with the inter storage container client manager
.withRangeStoreFactory(
new MVCCStoreFactoryImpl(
dlNamespaceProvider,
() -> new DLCheckpointStore(dlNamespaceProvider.get()),
storageConf.getRangeStoreDirs(),
storageResources,
storageConf.getServeReadOnlyTables(), storageConf))
// with client manager for proxying grpc requests
.withStorageServerClientManager(() -> new StorageServerClientManagerImpl(
proxyClientSettings,
storageResources.scheduler(),
StorageServerChannel.factory(proxyClientSettings)
// intercept the channel to attach routing header
.andThen(channel -> channel.intercept(new RoutingHeaderProxyInterceptor()))
));
StorageService storageService = new StorageService(
storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage"));
// Create gRPC server
StatsLogger rpcStatsLogger = rootStatsLogger.scope("grpc");
GrpcServerSpec serverSpec = GrpcServerSpec.builder()
.storeSupplier(storageService)
.storeServerConf(serverConf)
.endpoint(myEndpoint)
.statsLogger(rpcStatsLogger)
.build();
GrpcService grpcService = new GrpcService(
serverConf, serverSpec, rpcStatsLogger);
// Create a registration service provider
RegistrationServiceProvider regService = new RegistrationServiceProvider(
bkServerConf,
dlConf,
rootStatsLogger.scope("registration").scope("provider"));
// Create a registration state service only when service is ready.
RegistrationStateService regStateService = new RegistrationStateService(
myEndpoint,
bkServerConf,
bkConf,
regService,
rootStatsLogger.scope("registration"));
// Create a cluster controller service
ClusterControllerService clusterControllerService = new ClusterControllerService(
storageConf,
() -> new ClusterControllerImpl(
new ZkClusterMetadataStore(
curatorProviderService.get(),
ZKMetadataDriverBase.resolveZkServers(bkServerConf),
ZK_METADATA_ROOT_PATH),
regService.get(),
new DefaultStorageContainerController(),
new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
storageConf),
rootStatsLogger.scope("cluster_controller"));
// Create all the service stack
return serverBuilder
.addComponent(bkWatchService) // service that watches bookies
.addComponent(curatorProviderService) // service that provides curator client
.addComponent(dlNamespaceProvider) // service that provides dl namespace
.addComponent(storageService) // range (stream) store
.addComponent(grpcService) // range (stream) server (gRPC)
.addComponent(regService) // service that provides registration client
.addComponent(regStateService) // service that manages server state
.addComponent(clusterControllerService) // service that run cluster controller service
.build();
}