in banyand/internal/cmd/storage.go [50:135]
func newStorageCmd() *cobra.Command {
l := logger.GetLogger("bootstrap")
ctx := context.Background()
// nolint: staticcheck
pipeline, err := queue.NewQueue(ctx)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate data pipeline")
}
metaSvc, err := metadata.NewClient(ctx)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate stream service")
}
measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate measure service")
}
// TODO: remove streamSVC and measureSvc from query processor. To use metaSvc instead.
q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate query processor")
}
profSvc := observability.NewProfService()
metricSvc := observability.NewMetricService()
units := []run.Unit{
new(signal.Handler),
pipeline,
measureSvc,
streamSvc,
q,
profSvc,
}
if metricSvc != nil {
units = append(units, metricSvc)
}
// Meta the run Group units.
storageGroup.Register(units...)
logging := logger.Logging{}
storageCmd := &cobra.Command{
Use: "storage",
Version: version.Build(),
Short: "Run as the storage server",
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
if err = config.Load("logging", cmd.Flags()); err != nil {
return err
}
return logger.Init(logging)
},
PreRun: func(cmd *cobra.Command, args []string) {
if flagStorageMode == storageModeMix {
return
}
switch flagStorageMode {
case storageModeData:
storageGroup.Deregister(q)
case storageModeQuery:
storageGroup.Deregister(streamSvc)
storageGroup.Deregister(measureSvc)
default:
l.Fatal().Str("mode", flagStorageMode).Msg("unknown storage mode")
}
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
fmt.Print(logo)
logger.GetLogger().Info().Msg("starting as a storage server")
// Spawn our go routines and wait for shutdown.
if err := storageGroup.Run(); err != nil {
logger.GetLogger().Error().Err(err).Stack().Str("name", storageGroup.Name()).Msg("Exit")
os.Exit(-1)
}
return nil
},
}
storageCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging")
storageCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging")
storageCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module")
storageCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging")
storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", storageModeMix, "the storage mode, one of [data, query, mix]")
storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet)
return storageCmd
}