in cmd/aresd/cmd/cmd.go [129:309]
func (aresd *AresD) start(
cfg common.AresServerConfig,
logger common.Logger,
queryLogger common.Logger,
metricsCfg common.Metrics,
httpWrapper utils.HTTPHandlerWrapper,
middleware func(http.Handler) http.Handler,
) {
logger.With("config", cfg).Info("Bootstrapping service")
// Check whether we have a correct device running environment
cgoutils.DeviceFree(unsafe.Pointer(nil), 0)
// Pause profiler util requested
cgoutils.CudaProfilerStop()
scope, closer, err := metricsCfg.NewRootScope()
if err != nil {
logger.Fatal("Failed to create new root scope", err)
}
defer closer.Close()
// Init common components.
utils.Init(cfg, logger, queryLogger, scope)
scope.Counter("restart").Inc(1)
if cfg.Cluster.Distributed {
startDataNode(cfg, logger, scope, httpWrapper, middleware)
return
}
// TODO keep this path for non-distributed mode, and to aovid code break
// should be removed later after distributed mode is mature
serverRestartTimer := scope.Timer("restart").Start()
// Create MetaStore.
metaStorePath := filepath.Join(cfg.RootPath, "metastore")
metaStore, err := metastore.NewDiskMetaStore(metaStorePath)
if err != nil {
logger.Panic(err)
}
// Create DiskStore.
diskStore := diskstore.NewLocalDiskStore(cfg.RootPath)
// fetch schema from controller and start periodical job
if cfg.Cluster.Enable {
if cfg.Cluster.Namespace == "" {
logger.Fatal("Missing namespace")
}
controllerClientCfg := cfg.Cluster.Controller
if controllerClientCfg == nil {
logger.Fatal("Missing controller client config", err)
}
if cfg.Cluster.InstanceID != "" {
controllerClientCfg.Headers.Add(controllerCli.InstanceNameHeaderKey, cfg.Cluster.InstanceID)
}
controllerClient := controllerCli.NewControllerHTTPClient(controllerClientCfg.Address, time.Duration(controllerClientCfg.TimeoutSec)*time.Second, controllerClientCfg.Headers)
schemaFetchJob := metastore.NewSchemaFetchJob(30, metaStore, nil, metastore.NewTableSchameValidator(), controllerClient, nil, cfg.Cluster.Namespace, "")
// immediate initial fetch
schemaFetchJob.FetchSchema()
go schemaFetchJob.Run()
}
bootstrapToken := bootstrap.NewPeerDataNodeServer(metaStore, diskStore).(memCom.BootStrapToken)
redoLogManagerMaster, err := redolog.NewRedoLogManagerMaster(cfg.Cluster.Namespace, &cfg.RedoLogConfig, diskStore, metaStore)
if err != nil {
utils.GetLogger().Fatal(err)
}
// Create MemStore.
memStore := memstore.NewMemStore(metaStore, diskStore, memstore.NewOptions(bootstrapToken, redoLogManagerMaster))
// Read schema.
utils.GetLogger().Infof("Reading schema from local MetaStore %s", metaStorePath)
err = memStore.FetchSchema()
if err != nil {
utils.GetLogger().Fatal(err)
}
// create schema handler
schemaHandler := api.NewSchemaHandler(metaStore, memStore)
// create enum handler
enumHandler := api.NewEnumHandler(memStore, metaStore)
// create query hanlder.
// static shard owner with non distributed version
staticShardOwner := topology.NewStaticShardOwner([]int{0})
queryHandler := api.NewQueryHandler(memStore, staticShardOwner, cfg.Query, cfg.HTTP.MaxQueryConnections)
// create health check handler.
healthCheckHandler := api.NewHealthCheckHandler()
nodeModulesHandler := http.StripPrefix("/node_modules/", http.FileServer(http.Dir("./api/ui/node_modules/")))
// Start HTTP server for debugging.
if cfg.DebugPort > 0 {
go func() {
debugHandler := api.NewDebugHandler(cfg.Cluster.Namespace, memStore, metaStore, queryHandler, healthCheckHandler, staticShardOwner, nil)
debugStaticHandler := http.StripPrefix("/static/", utils.NoCache(
http.FileServer(http.Dir("./api/ui/debug/"))))
debugRouter := mux.NewRouter()
debugHandler.Register(debugRouter.PathPrefix("/dbg").Subrouter())
schemaHandler.RegisterForDebug(debugRouter.PathPrefix("/schema").Subrouter())
debugRouter.PathPrefix("/node_modules/").Handler(nodeModulesHandler)
debugRouter.PathPrefix("/static/").Handler(debugStaticHandler)
debugRouter.HandleFunc("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
debugRouter.HandleFunc("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
debugRouter.HandleFunc("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
debugRouter.HandleFunc("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
debugRouter.PathPrefix("/debug/pprof/").Handler(http.HandlerFunc(pprof.Index))
utils.GetLogger().Infof("Starting HTTP server on dbg-port %d", cfg.DebugPort)
utils.GetLogger().Fatal(http.ListenAndServe(fmt.Sprintf(":%d", cfg.DebugPort), debugRouter))
}()
} else {
utils.GetLogger().Infof("Debug port not configured, debug server will be disabled")
}
// Init shards.
utils.GetLogger().Infof("Initializing shards from local DiskStore %s", cfg.RootPath)
memStore.InitShards(cfg.SchedulerOff, topology.NewStaticShardOwner([]int{0}))
// Start serving.
dataHandler := api.NewDataHandler(memStore, cfg.HTTP.MaxIngestionConnections)
router := mux.NewRouter()
metricsLoggingMiddleWareProvider := utils.NewMetricsLoggingMiddleWareProvider(scope, logger)
httpWrappers := []utils.HTTPHandlerWrapper{metricsLoggingMiddleWareProvider.WithMetrics}
if httpWrapper != nil {
httpWrappers = append(httpWrappers, httpWrapper)
}
schemaRouter := router.PathPrefix("/schema")
if cfg.Cluster.Enable {
schemaRouter = schemaRouter.Methods(http.MethodGet)
}
schemaHandler.Register(schemaRouter.Subrouter(), httpWrappers...)
enumHandler.Register(router.PathPrefix("/schema").Subrouter(), httpWrappers...)
dataHandler.Register(router.PathPrefix("/data").Subrouter(), httpWrappers...)
queryHandler.Register(router.PathPrefix("/query").Subrouter(), httpWrappers...)
swaggerHandler := http.StripPrefix("/swagger/", http.FileServer(http.Dir("./api/ui/swagger/")))
router.PathPrefix("/swagger/").Handler(swaggerHandler)
router.PathPrefix("/node_modules/").Handler(nodeModulesHandler)
router.HandleFunc("/health", utils.ApplyHTTPWrappers(healthCheckHandler.HealthCheck, metricsLoggingMiddleWareProvider.WithMetrics))
router.HandleFunc("/version", healthCheckHandler.Version)
// Support CORS calls.
allowOrigins := handlers.AllowedOrigins([]string{"*"})
allowHeaders := handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Language", "Origin", "Content-Type"})
allowMethods := handlers.AllowedMethods([]string{"GET", "PUT", "POST", "DELETE", "OPTIONS"})
serverRestartTimer.Stop()
batchStatsReporter := memstore.NewBatchStatsReporter(5*60, memStore, topology.NewStaticShardOwner([]int{0}))
go batchStatsReporter.Run()
utils.GetLogger().Infof("Starting HTTP server on port %d with max connection %d", cfg.Port, cfg.HTTP.MaxConnections)
handler := handlers.CORS(allowOrigins, allowHeaders, allowMethods)(router)
if middleware != nil {
handler = middleware(handler)
}
doneChan, server := utils.LimitServeAsync(cfg.Port, handler, cfg.HTTP)
aresd.server = server
// notify other routes that the server is up
aresd.StartedChan <- struct{}{}
// waiting for the server to stop
utils.GetLogger().Error(<-doneChan)
batchStatsReporter.Stop()
redoLogManagerMaster.Stop()
}