pkg/cmdsetup/liaison.go (97 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cmdsetup
import (
"context"
"os"
"github.com/spf13/cobra"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
"github.com/apache/skywalking-banyandb/banyand/dquery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/queue/pub"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/node"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/version"
)
func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
l := logger.GetLogger("bootstrap")
ctx := context.Background()
metaSvc, err := metadata.NewClient(true, false)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
pipeline := pub.New(metaSvc)
localPipeline := queue.Local()
measureNodeSel := node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc)
measureNodeRegistry := grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, pipeline, measureNodeSel)
metricSvc := observability.NewMetricService(metaSvc, pipeline, "liaison", measureNodeRegistry)
streamNodeSel := node.NewRoundRobinSelector(data.TopicStreamWrite.String(), metaSvc)
propertyNodeSel := node.NewRoundRobinSelector(data.TopicPropertyUpdate.String(), metaSvc)
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, grpc.NodeRegistries{
MeasureNodeRegistry: measureNodeRegistry,
StreamNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicStreamWrite, pipeline, streamNodeSel),
PropertyNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, pipeline, propertyNodeSel),
}, metricSvc)
profSvc := observability.NewProfService()
httpServer := http.NewServer()
dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline, metricSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate distributed query service")
}
var units []run.Unit
units = append(units, runners...)
units = append(units,
metaSvc,
localPipeline,
pipeline,
measureNodeSel,
streamNodeSel,
propertyNodeSel,
metricSvc,
dQuery,
grpcServer,
httpServer,
profSvc,
)
liaisonGroup := run.NewGroup("liaison")
liaisonGroup.Register(units...)
var nodeSelector string
liaisonCmd := &cobra.Command{
Use: "liaison",
Version: version.Build(),
Short: "Run as the liaison server",
RunE: func(_ *cobra.Command, _ []string) (err error) {
ctx := context.Background()
if nodeSelector != "" {
ctx = context.WithValue(ctx, common.ContextNodeSelectorKey, nodeSelector)
var ls *pub.LabelSelector
ls, err = pub.ParseLabelSelector(nodeSelector)
if err != nil {
return err
}
for _, sel := range []node.Selector{measureNodeSel, streamNodeSel, propertyNodeSel} {
sel.SetNodeSelector(ls)
}
}
node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort())
if err != nil {
return err
}
logger.GetLogger().Info().Msg("starting as a liaison server")
ctx = context.WithValue(ctx, common.ContextNodeKey, node)
// Spawn our go routines and wait for shutdown.
if err := liaisonGroup.Run(ctx); err != nil {
logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit")
os.Exit(-1)
}
return nil
},
}
liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet)
liaisonCmd.PersistentFlags().StringVar(&nodeSelector, "data-node-selector", "",
"the data node selector. e.g. key1=value1,key2=value2. If not set, all nodes are selected")
return liaisonCmd
}