func start()

in cmd/broker/cmd/cmd.go [81:187]


func start(
	cfg config.BrokerConfig,
	logger common.Logger,
	queryLogger common.Logger,
	metricsCfg common.Metrics,
	httpWrapper utils.HTTPHandlerWrapper,
	middleware func(http.Handler) http.Handler,
) {
	logger.With("config", cfg).Info("Starting aresdb broker service")

	scope, closer, err := metricsCfg.NewRootScope()
	if err != nil {
		logger.Fatal("Failed to create new root scope", err)
	}
	defer closer.Close()

	// Init common components.
	utils.Init(common.AresServerConfig{}, logger, queryLogger, scope)

	scope.Counter("restart").Inc(1)
	serverRestartTimer := scope.Timer("restart").Start()
	defer serverRestartTimer.Stop()

	// fetch and keep syncing schema
	controllerClientCfg := cfg.Cluster.Controller
	if controllerClientCfg == nil {
		logger.Fatal("Missing controller client config", err)
	}

	var (
		topo        topology.HealthTrackingDynamicTopoloy
		clusterName = cfg.Cluster.Namespace
		serviceName = utils.BrokerServiceName(clusterName)
		store       kv.TxnStore
	)

	cfg.Cluster.Etcd.Service = serviceName
	configServiceCli, err := cfg.Cluster.Etcd.NewClient(
		instrument.NewOptions().SetLogger(zap.NewExample()))
	if err != nil {
		logger.Fatal("Failed to create config service client,", err)
	}

	controllerClient := client.NewControllerHTTPClient(
		controllerClientCfg.Address,
		time.Duration(controllerClientCfg.TimeoutSec)*time.Second,
		controllerClientCfg.Headers,
	)
	brokerSchemaMutator := broker.NewBrokerSchemaMutator()

	store, err = configServiceCli.Txn()
	if err != nil {
		logger.Fatal("Failed to get kv store")
	}

	schemaFetchJob := metastore.NewSchemaFetchJob(
		10,
		brokerSchemaMutator,
		brokerSchemaMutator,
		metastore.NewTableSchameValidator(),
		controllerClient,
		controllerEtcd.NewEnumMutator(
			store, controllerEtcd.NewTableSchemaMutator(
				store,
				zap.NewExample().Sugar(),
			),
		),
		clusterName,
		"",
	)
	schemaFetchJob.FetchSchema()
	schemaFetchJob.FetchEnum()
	go schemaFetchJob.Run()

	dynamicOptions := topology.NewDynamicOptions().SetConfigServiceClient(configServiceCli).SetServiceID(services.NewServiceID().SetZone(cfg.Cluster.Etcd.Zone).SetName(serviceName).SetEnvironment(cfg.Cluster.Etcd.Env))
	topo, err = topology.NewHealthTrackingDynamicTopology(dynamicOptions)
	if err != nil {
		logger.Fatal("Failed to create health tracking dynamic topology,", err)
	}

	// executor
	exec := broker.NewQueryExecutor(brokerSchemaMutator, topo, dataNodeCli.NewDataNodeQueryClient(logger))

	// init handlers
	queryHandler := broker.NewQueryHandler(exec, cfg.Cluster.InstanceID)

	// start HTTP server
	router := mux.NewRouter()
	metricsLoggingMiddlewareProvider := utils.NewMetricsLoggingMiddleWareProvider(scope, logger)
	httpWrappers := []utils.HTTPHandlerWrapper{metricsLoggingMiddlewareProvider.WithMetrics}
	if httpWrapper != nil {
		httpWrappers = append(httpWrappers, httpWrapper)
	}
	queryHandler.Register(router.PathPrefix("/query").Subrouter(), httpWrappers...)

	// Support CORS calls.
	allowOrigins := handlers.AllowedOrigins([]string{"*"})
	allowHeaders := handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Language", "Origin", "Content-Type"})
	allowMethods := handlers.AllowedMethods([]string{"GET", "PUT", "POST", "DELETE", "OPTIONS"})

	utils.GetLogger().Infof("Starting HTTP server on port %d with max connection %d", cfg.Port, cfg.HTTP.MaxConnections)
	handler := handlers.CORS(allowOrigins, allowHeaders, allowMethods)(router)
	if middleware != nil {
		handler = middleware(handler)
	}
	utils.LimitServe(cfg.Port, handler, cfg.HTTP)
}