func()

in cmd/aresd/cmd/cmd.go [129:309]


func (aresd *AresD) start(
	cfg common.AresServerConfig,
	logger common.Logger,
	queryLogger common.Logger,
	metricsCfg common.Metrics,
	httpWrapper utils.HTTPHandlerWrapper,
	middleware func(http.Handler) http.Handler,
) {
	logger.With("config", cfg).Info("Bootstrapping service")

	// Check whether we have a correct device running environment
	cgoutils.DeviceFree(unsafe.Pointer(nil), 0)

	// Pause profiler util requested
	cgoutils.CudaProfilerStop()

	scope, closer, err := metricsCfg.NewRootScope()
	if err != nil {
		logger.Fatal("Failed to create new root scope", err)
	}
	defer closer.Close()

	// Init common components.
	utils.Init(cfg, logger, queryLogger, scope)

	scope.Counter("restart").Inc(1)

	if cfg.Cluster.Distributed {
		startDataNode(cfg, logger, scope, httpWrapper, middleware)
		return
	}

	// TODO keep this path for non-distributed mode, and to aovid code break
	// should be removed later after distributed mode is mature
	serverRestartTimer := scope.Timer("restart").Start()

	// Create MetaStore.
	metaStorePath := filepath.Join(cfg.RootPath, "metastore")
	metaStore, err := metastore.NewDiskMetaStore(metaStorePath)
	if err != nil {
		logger.Panic(err)
	}

	// Create DiskStore.
	diskStore := diskstore.NewLocalDiskStore(cfg.RootPath)

	// fetch schema from controller and start periodical job
	if cfg.Cluster.Enable {
		if cfg.Cluster.Namespace == "" {
			logger.Fatal("Missing namespace")
		}
		controllerClientCfg := cfg.Cluster.Controller
		if controllerClientCfg == nil {
			logger.Fatal("Missing controller client config", err)
		}
		if cfg.Cluster.InstanceID != "" {
			controllerClientCfg.Headers.Add(controllerCli.InstanceNameHeaderKey, cfg.Cluster.InstanceID)
		}

		controllerClient := controllerCli.NewControllerHTTPClient(controllerClientCfg.Address, time.Duration(controllerClientCfg.TimeoutSec)*time.Second, controllerClientCfg.Headers)
		schemaFetchJob := metastore.NewSchemaFetchJob(30, metaStore, nil, metastore.NewTableSchameValidator(), controllerClient, nil, cfg.Cluster.Namespace, "")
		// immediate initial fetch
		schemaFetchJob.FetchSchema()
		go schemaFetchJob.Run()

	}

	bootstrapToken := bootstrap.NewPeerDataNodeServer(metaStore, diskStore).(memCom.BootStrapToken)

	redoLogManagerMaster, err := redolog.NewRedoLogManagerMaster(cfg.Cluster.Namespace, &cfg.RedoLogConfig, diskStore, metaStore)
	if err != nil {
		utils.GetLogger().Fatal(err)
	}

	// Create MemStore.
	memStore := memstore.NewMemStore(metaStore, diskStore, memstore.NewOptions(bootstrapToken, redoLogManagerMaster))

	// Read schema.
	utils.GetLogger().Infof("Reading schema from local MetaStore %s", metaStorePath)
	err = memStore.FetchSchema()
	if err != nil {
		utils.GetLogger().Fatal(err)
	}

	// create schema handler
	schemaHandler := api.NewSchemaHandler(metaStore, memStore)

	// create enum handler
	enumHandler := api.NewEnumHandler(memStore, metaStore)

	// create query hanlder.
	// static shard owner with non distributed version
	staticShardOwner := topology.NewStaticShardOwner([]int{0})
	queryHandler := api.NewQueryHandler(memStore, staticShardOwner, cfg.Query, cfg.HTTP.MaxQueryConnections)

	// create health check handler.
	healthCheckHandler := api.NewHealthCheckHandler()

	nodeModulesHandler := http.StripPrefix("/node_modules/", http.FileServer(http.Dir("./api/ui/node_modules/")))

	// Start HTTP server for debugging.
	if cfg.DebugPort > 0 {
		go func() {
			debugHandler := api.NewDebugHandler(cfg.Cluster.Namespace, memStore, metaStore, queryHandler, healthCheckHandler, staticShardOwner, nil)

			debugStaticHandler := http.StripPrefix("/static/", utils.NoCache(
				http.FileServer(http.Dir("./api/ui/debug/"))))
			debugRouter := mux.NewRouter()
			debugHandler.Register(debugRouter.PathPrefix("/dbg").Subrouter())
			schemaHandler.RegisterForDebug(debugRouter.PathPrefix("/schema").Subrouter())

			debugRouter.PathPrefix("/node_modules/").Handler(nodeModulesHandler)
			debugRouter.PathPrefix("/static/").Handler(debugStaticHandler)
			debugRouter.HandleFunc("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
			debugRouter.HandleFunc("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
			debugRouter.HandleFunc("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
			debugRouter.HandleFunc("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
			debugRouter.PathPrefix("/debug/pprof/").Handler(http.HandlerFunc(pprof.Index))

			utils.GetLogger().Infof("Starting HTTP server on dbg-port %d", cfg.DebugPort)
			utils.GetLogger().Fatal(http.ListenAndServe(fmt.Sprintf(":%d", cfg.DebugPort), debugRouter))
		}()
	} else {
		utils.GetLogger().Infof("Debug port not configured, debug server will be disabled")
	}

	// Init shards.
	utils.GetLogger().Infof("Initializing shards from local DiskStore %s", cfg.RootPath)
	memStore.InitShards(cfg.SchedulerOff, topology.NewStaticShardOwner([]int{0}))

	// Start serving.
	dataHandler := api.NewDataHandler(memStore, cfg.HTTP.MaxIngestionConnections)
	router := mux.NewRouter()

	metricsLoggingMiddleWareProvider := utils.NewMetricsLoggingMiddleWareProvider(scope, logger)

	httpWrappers := []utils.HTTPHandlerWrapper{metricsLoggingMiddleWareProvider.WithMetrics}
	if httpWrapper != nil {
		httpWrappers = append(httpWrappers, httpWrapper)
	}

	schemaRouter := router.PathPrefix("/schema")
	if cfg.Cluster.Enable {
		schemaRouter = schemaRouter.Methods(http.MethodGet)
	}
	schemaHandler.Register(schemaRouter.Subrouter(), httpWrappers...)
	enumHandler.Register(router.PathPrefix("/schema").Subrouter(), httpWrappers...)
	dataHandler.Register(router.PathPrefix("/data").Subrouter(), httpWrappers...)
	queryHandler.Register(router.PathPrefix("/query").Subrouter(), httpWrappers...)

	swaggerHandler := http.StripPrefix("/swagger/", http.FileServer(http.Dir("./api/ui/swagger/")))
	router.PathPrefix("/swagger/").Handler(swaggerHandler)
	router.PathPrefix("/node_modules/").Handler(nodeModulesHandler)
	router.HandleFunc("/health", utils.ApplyHTTPWrappers(healthCheckHandler.HealthCheck, metricsLoggingMiddleWareProvider.WithMetrics))
	router.HandleFunc("/version", healthCheckHandler.Version)

	// Support CORS calls.
	allowOrigins := handlers.AllowedOrigins([]string{"*"})
	allowHeaders := handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Language", "Origin", "Content-Type"})
	allowMethods := handlers.AllowedMethods([]string{"GET", "PUT", "POST", "DELETE", "OPTIONS"})

	serverRestartTimer.Stop()

	batchStatsReporter := memstore.NewBatchStatsReporter(5*60, memStore, topology.NewStaticShardOwner([]int{0}))
	go batchStatsReporter.Run()

	utils.GetLogger().Infof("Starting HTTP server on port %d with max connection %d", cfg.Port, cfg.HTTP.MaxConnections)

	handler := handlers.CORS(allowOrigins, allowHeaders, allowMethods)(router)
	if middleware != nil {
		handler = middleware(handler)
	}
	doneChan, server := utils.LimitServeAsync(cfg.Port, handler, cfg.HTTP)
	aresd.server = server
	// notify other routes that the server is up
	aresd.StartedChan <- struct{}{}
	// waiting for the server to stop
	utils.GetLogger().Error(<-doneChan)
	batchStatsReporter.Stop()
	redoLogManagerMaster.Stop()
}