in cmd/broker/cmd/cmd.go [81:187]
func start(
cfg config.BrokerConfig,
logger common.Logger,
queryLogger common.Logger,
metricsCfg common.Metrics,
httpWrapper utils.HTTPHandlerWrapper,
middleware func(http.Handler) http.Handler,
) {
logger.With("config", cfg).Info("Starting aresdb broker service")
scope, closer, err := metricsCfg.NewRootScope()
if err != nil {
logger.Fatal("Failed to create new root scope", err)
}
defer closer.Close()
// Init common components.
utils.Init(common.AresServerConfig{}, logger, queryLogger, scope)
scope.Counter("restart").Inc(1)
serverRestartTimer := scope.Timer("restart").Start()
defer serverRestartTimer.Stop()
// fetch and keep syncing schema
controllerClientCfg := cfg.Cluster.Controller
if controllerClientCfg == nil {
logger.Fatal("Missing controller client config", err)
}
var (
topo topology.HealthTrackingDynamicTopoloy
clusterName = cfg.Cluster.Namespace
serviceName = utils.BrokerServiceName(clusterName)
store kv.TxnStore
)
cfg.Cluster.Etcd.Service = serviceName
configServiceCli, err := cfg.Cluster.Etcd.NewClient(
instrument.NewOptions().SetLogger(zap.NewExample()))
if err != nil {
logger.Fatal("Failed to create config service client,", err)
}
controllerClient := client.NewControllerHTTPClient(
controllerClientCfg.Address,
time.Duration(controllerClientCfg.TimeoutSec)*time.Second,
controllerClientCfg.Headers,
)
brokerSchemaMutator := broker.NewBrokerSchemaMutator()
store, err = configServiceCli.Txn()
if err != nil {
logger.Fatal("Failed to get kv store")
}
schemaFetchJob := metastore.NewSchemaFetchJob(
10,
brokerSchemaMutator,
brokerSchemaMutator,
metastore.NewTableSchameValidator(),
controllerClient,
controllerEtcd.NewEnumMutator(
store, controllerEtcd.NewTableSchemaMutator(
store,
zap.NewExample().Sugar(),
),
),
clusterName,
"",
)
schemaFetchJob.FetchSchema()
schemaFetchJob.FetchEnum()
go schemaFetchJob.Run()
dynamicOptions := topology.NewDynamicOptions().SetConfigServiceClient(configServiceCli).SetServiceID(services.NewServiceID().SetZone(cfg.Cluster.Etcd.Zone).SetName(serviceName).SetEnvironment(cfg.Cluster.Etcd.Env))
topo, err = topology.NewHealthTrackingDynamicTopology(dynamicOptions)
if err != nil {
logger.Fatal("Failed to create health tracking dynamic topology,", err)
}
// executor
exec := broker.NewQueryExecutor(brokerSchemaMutator, topo, dataNodeCli.NewDataNodeQueryClient(logger))
// init handlers
queryHandler := broker.NewQueryHandler(exec, cfg.Cluster.InstanceID)
// start HTTP server
router := mux.NewRouter()
metricsLoggingMiddlewareProvider := utils.NewMetricsLoggingMiddleWareProvider(scope, logger)
httpWrappers := []utils.HTTPHandlerWrapper{metricsLoggingMiddlewareProvider.WithMetrics}
if httpWrapper != nil {
httpWrappers = append(httpWrappers, httpWrapper)
}
queryHandler.Register(router.PathPrefix("/query").Subrouter(), httpWrappers...)
// Support CORS calls.
allowOrigins := handlers.AllowedOrigins([]string{"*"})
allowHeaders := handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Language", "Origin", "Content-Type"})
allowMethods := handlers.AllowedMethods([]string{"GET", "PUT", "POST", "DELETE", "OPTIONS"})
utils.GetLogger().Infof("Starting HTTP server on port %d with max connection %d", cfg.Port, cfg.HTTP.MaxConnections)
handler := handlers.CORS(allowOrigins, allowHeaders, allowMethods)(router)
if middleware != nil {
handler = middleware(handler)
}
utils.LimitServe(cfg.Port, handler, cfg.HTTP)
}