func newLiaisonCmd()

in pkg/cmdsetup/liaison.go [41:119]


func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
	l := logger.GetLogger("bootstrap")
	ctx := context.Background()
	metaSvc, err := metadata.NewClient(true, false)
	if err != nil {
		l.Fatal().Err(err).Msg("failed to initiate metadata service")
	}
	pipeline := pub.New(metaSvc)
	localPipeline := queue.Local()
	measureNodeSel := node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc)
	measureNodeRegistry := grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, pipeline, measureNodeSel)
	metricSvc := observability.NewMetricService(metaSvc, pipeline, "liaison", measureNodeRegistry)
	streamNodeSel := node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc)
	propertyNodeSel := node.NewRoundRobinSelector(data.TopicPropertyUpdate.String(), metaSvc)
	grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, grpc.NodeRegistries{
		MeasureNodeRegistry:  measureNodeRegistry,
		StreamNodeRegistry:   grpc.NewClusterNodeRegistry(data.TopicStreamWrite, pipeline, streamNodeSel),
		PropertyNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, pipeline, propertyNodeSel),
	}, metricSvc)
	profSvc := observability.NewProfService()
	httpServer := http.NewServer()
	dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline, metricSvc)
	if err != nil {
		l.Fatal().Err(err).Msg("failed to initiate distributed query service")
	}
	var units []run.Unit
	units = append(units, runners...)
	units = append(units,
		metaSvc,
		localPipeline,
		pipeline,
		measureNodeSel,
		streamNodeSel,
		propertyNodeSel,
		metricSvc,
		dQuery,
		grpcServer,
		httpServer,
		profSvc,
	)
	liaisonGroup := run.NewGroup("liaison")
	liaisonGroup.Register(units...)
	var nodeSelector string
	liaisonCmd := &cobra.Command{
		Use:     "liaison",
		Version: version.Build(),
		Short:   "Run as the liaison server",
		RunE: func(_ *cobra.Command, _ []string) (err error) {
			ctx := context.Background()
			if nodeSelector != "" {
				ctx = context.WithValue(ctx, common.ContextNodeSelectorKey, nodeSelector)
				var ls *pub.LabelSelector
				ls, err = pub.ParseLabelSelector(nodeSelector)
				if err != nil {
					return err
				}
				for _, sel := range []node.Selector{measureNodeSel, streamNodeSel, propertyNodeSel} {
					sel.SetNodeSelector(ls)
				}
			}
			node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort())
			if err != nil {
				return err
			}
			logger.GetLogger().Info().Msg("starting as a liaison server")
			ctx = context.WithValue(ctx, common.ContextNodeKey, node)
			// Spawn our go routines and wait for shutdown.
			if err := liaisonGroup.Run(ctx); err != nil {
				logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit")
				os.Exit(-1)
			}
			return nil
		},
	}
	liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet)
	liaisonCmd.PersistentFlags().StringVar(&nodeSelector, "data-node-selector", "",
		"the data node selector. e.g. key1=value1,key2=value2. If not set, all nodes are selected")
	return liaisonCmd
}