public static LifecycleComponent buildStorageServer()

in stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java [205:417]


    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
                                                        int grpcPort,
                                                        boolean useHostname,
                                                        boolean startBookieAndStartProvider,
                                                        StatsLogger externalStatsLogger)
            throws Exception {

        final ComponentInfoPublisher componentInfoPublisher = new ComponentInfoPublisher();

        final Supplier<BookieServiceInfo> bookieServiceInfoProvider =
                () -> buildBookieServiceInfo(componentInfoPublisher);

        LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder()
            .withName("storage-server")
            .withComponentInfoPublisher(componentInfoPublisher);

        BookieConfiguration bkConf = BookieConfiguration.of(conf);
        bkConf.validate();

        DLConfiguration dlConf = DLConfiguration.of(conf);
        dlConf.validate();

        StorageServerConfiguration serverConf = StorageServerConfiguration.of(conf);
        serverConf.validate();

        StorageConfiguration storageConf = new StorageConfiguration(conf);
        storageConf.validate();

        // Get my local endpoint
        Endpoint myEndpoint = createLocalEndpoint(grpcPort, useHostname);

        // Create shared resources
        StorageResources storageResources = StorageResources.create();

        // Create the stats provider
        StatsLogger rootStatsLogger;
        StatsProviderService statsProviderService = null;
        if (startBookieAndStartProvider) {
            statsProviderService = new StatsProviderService(bkConf);
            rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
            serverBuilder.addComponent(statsProviderService);
            log.info("Bookie configuration : {}", bkConf.asJson());
        } else {
            rootStatsLogger = checkNotNull(externalStatsLogger,
                "External stats logger is not provided while not starting stats provider");
        }

        // dump configurations
        log.info("Dlog configuration : {}", dlConf.asJson());
        log.info("Storage configuration : {}", storageConf.asJson());
        log.info("Server configuration : {}", serverConf.asJson());

        // Create the bookie service
        ServerConfiguration bkServerConf;
        if (startBookieAndStartProvider) {
            BookieService bookieService = new BookieService(bkConf, rootStatsLogger, bookieServiceInfoProvider);
            serverBuilder.addComponent(bookieService);
            bkServerConf = bookieService.serverConf();

            // Build http service
            if (bkServerConf.isHttpServerEnabled()) {
                MetadataBookieDriver metadataDriver = BookieResources.createMetadataDriver(bkServerConf,
                        rootStatsLogger);
                serverBuilder.addComponent(new AutoCloseableLifecycleComponent("metadataDriver",
                        metadataDriver));
                LedgerManagerFactory ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
                serverBuilder.addComponent(new AutoCloseableLifecycleComponent("lmFactory",
                        ledgerManagerFactory));

                BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder()
                        .setBookieServer(bookieService.getServer())
                        .setServerConfiguration(bkServerConf)
                        .setStatsProvider(statsProviderService.getStatsProvider())
                        .setLedgerManagerFactory(ledgerManagerFactory)
                        .build();
                HttpService httpService =
                        new HttpService(provider,
                                new org.apache.bookkeeper.server.conf.BookieConfiguration(bkServerConf),
                                rootStatsLogger);
                serverBuilder.addComponent(httpService);
                log.info("Load lifecycle component : {}", HttpService.class.getName());
            }

        } else {
            bkServerConf = new ServerConfiguration();
            bkServerConf.loadConf(bkConf.getUnderlyingConf());
        }

        // Create the bookie watch service
        BookieWatchService bkWatchService;
        {
            DistributedLogConfiguration dlogConf = new DistributedLogConfiguration();
            dlogConf.loadConf(dlConf);
            bkWatchService = new BookieWatchService(
                dlogConf.getEnsembleSize(),
                bkConf,
                NullStatsLogger.INSTANCE);
        }

        // Create the curator provider service
        CuratorProviderService curatorProviderService = new CuratorProviderService(
            bkServerConf, dlConf, rootStatsLogger.scope("curator"));

        // Create the distributedlog namespace service
        DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(
            bkServerConf,
            dlConf,
            rootStatsLogger.scope("dlog"));

        // client settings for the proxy channels
        StorageClientSettings proxyClientSettings = StorageClientSettings.newBuilder()
            .serviceUri("bk://localhost:" + grpcPort)
            .build();
        // Create range (stream) store
        StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder()
            .withStatsLogger(rootStatsLogger.scope("storage"))
            .withStorageConfiguration(storageConf)
            // the storage resources shared across multiple components
            .withStorageResources(storageResources)
            // the placement policy
            .withStorageContainerPlacementPolicyFactory(() -> {
                long numStorageContainers;
                try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(
                    curatorProviderService.get(),
                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
                    ZK_METADATA_ROOT_PATH)) {
                    numStorageContainers = store.getClusterMetadata().getNumStorageContainers();
                }
                return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers);
            })
            // the default log backend uri
            .withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
            // with zk-based storage container manager
            .withStorageContainerManagerFactory((storeConf, registry) ->
                new ZkStorageContainerManager(
                    myEndpoint,
                    storageConf,
                    new ZkClusterMetadataStore(
                        curatorProviderService.get(),
                        ZKMetadataDriverBase.resolveZkServers(bkServerConf),
                        ZK_METADATA_ROOT_PATH),
                    registry,
                    rootStatsLogger.scope("sc").scope("manager")))
            // with the inter storage container client manager
            .withRangeStoreFactory(
                new MVCCStoreFactoryImpl(
                    dlNamespaceProvider,
                    () -> new DLCheckpointStore(dlNamespaceProvider.get()),
                    storageConf.getRangeStoreDirs(),
                    storageResources,
                    storageConf.getServeReadOnlyTables(), storageConf))
            // with client manager for proxying grpc requests
            .withStorageServerClientManager(() -> new StorageServerClientManagerImpl(
                proxyClientSettings,
                storageResources.scheduler(),
                StorageServerChannel.factory(proxyClientSettings)
                    // intercept the channel to attach routing header
                    .andThen(channel -> channel.intercept(new RoutingHeaderProxyInterceptor()))
            ));
        StorageService storageService = new StorageService(
            storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage"));

        // Create gRPC server
        StatsLogger rpcStatsLogger = rootStatsLogger.scope("grpc");
        GrpcServerSpec serverSpec = GrpcServerSpec.builder()
            .storeSupplier(storageService)
            .storeServerConf(serverConf)
            .endpoint(myEndpoint)
            .statsLogger(rpcStatsLogger)
            .build();
        GrpcService grpcService = new GrpcService(
            serverConf, serverSpec, rpcStatsLogger);

        // Create a registration service provider
        RegistrationServiceProvider regService = new RegistrationServiceProvider(
            bkServerConf,
            dlConf,
            rootStatsLogger.scope("registration").scope("provider"));

        // Create a registration state service only when service is ready.
        RegistrationStateService regStateService = new RegistrationStateService(
            myEndpoint,
            bkServerConf,
            bkConf,
            regService,
            rootStatsLogger.scope("registration"));

        // Create a cluster controller service
        ClusterControllerService clusterControllerService = new ClusterControllerService(
            storageConf,
            () -> new ClusterControllerImpl(
                new ZkClusterMetadataStore(
                    curatorProviderService.get(),
                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
                    ZK_METADATA_ROOT_PATH),
                regService.get(),
                new DefaultStorageContainerController(),
                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
                storageConf),
            rootStatsLogger.scope("cluster_controller"));

        // Create all the service stack
        return serverBuilder
            .addComponent(bkWatchService)           // service that watches bookies
            .addComponent(curatorProviderService)   // service that provides curator client
            .addComponent(dlNamespaceProvider)      // service that provides dl namespace
            .addComponent(storageService)           // range (stream) store
            .addComponent(grpcService)              // range (stream) server (gRPC)
            .addComponent(regService)               // service that provides registration client
            .addComponent(regStateService)          // service that manages server state
            .addComponent(clusterControllerService) // service that run cluster controller service
            .build();
    }