func newStorageCmd()

in banyand/internal/cmd/storage.go [50:135]


func newStorageCmd() *cobra.Command {
	l := logger.GetLogger("bootstrap")
	ctx := context.Background()
	// nolint: staticcheck
	pipeline, err := queue.NewQueue(ctx)
	if err != nil {
		l.Fatal().Err(err).Msg("failed to initiate data pipeline")
	}
	metaSvc, err := metadata.NewClient(ctx)
	if err != nil {
		l.Fatal().Err(err).Msg("failed to initiate metadata service")
	}
	streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
	if err != nil {
		l.Fatal().Err(err).Msg("failed to initiate stream service")
	}
	measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
	if err != nil {
		l.Fatal().Err(err).Msg("failed to initiate measure service")
	}
	// TODO: remove streamSVC and measureSvc from query processor. To use metaSvc instead.
	q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline)
	if err != nil {
		l.Fatal().Err(err).Msg("failed to initiate query processor")
	}
	profSvc := observability.NewProfService()
	metricSvc := observability.NewMetricService()

	units := []run.Unit{
		new(signal.Handler),
		pipeline,
		measureSvc,
		streamSvc,
		q,
		profSvc,
	}
	if metricSvc != nil {
		units = append(units, metricSvc)
	}
	// Meta the run Group units.
	storageGroup.Register(units...)
	logging := logger.Logging{}
	storageCmd := &cobra.Command{
		Use:     "storage",
		Version: version.Build(),
		Short:   "Run as the storage server",
		PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
			if err = config.Load("logging", cmd.Flags()); err != nil {
				return err
			}
			return logger.Init(logging)
		},
		PreRun: func(cmd *cobra.Command, args []string) {
			if flagStorageMode == storageModeMix {
				return
			}
			switch flagStorageMode {
			case storageModeData:
				storageGroup.Deregister(q)
			case storageModeQuery:
				storageGroup.Deregister(streamSvc)
				storageGroup.Deregister(measureSvc)
			default:
				l.Fatal().Str("mode", flagStorageMode).Msg("unknown storage mode")
			}
		},
		RunE: func(cmd *cobra.Command, args []string) (err error) {
			fmt.Print(logo)
			logger.GetLogger().Info().Msg("starting as a storage server")
			// Spawn our go routines and wait for shutdown.
			if err := storageGroup.Run(); err != nil {
				logger.GetLogger().Error().Err(err).Stack().Str("name", storageGroup.Name()).Msg("Exit")
				os.Exit(-1)
			}
			return nil
		},
	}

	storageCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging")
	storageCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging")
	storageCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module")
	storageCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging")
	storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", storageModeMix, "the storage mode, one of [data, query, mix]")
	storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet)
	return storageCmd
}