banyand/dquery/dquery.go (151 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package dquery implement the distributed query. package dquery import ( "context" "errors" "strings" "time" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/schema" ) const ( moduleName = "distributed-query" hotStageName = "hot" ) var ( _ run.Service = (*queryService)(nil) distributedQueryScope = observability.RootScope.SubScope("dquery") streamScope = distributedQueryScope.SubScope("stream") measureScope = distributedQueryScope.SubScope("measure") ) type queryService struct { metaService metadata.Repo pipeline queue.Server omr observability.MetricsRegistry log *logger.Logger sqp *streamQueryProcessor mqp *measureQueryProcessor tqp *topNQueryProcessor closer *run.Closer nodeID string hotStageNodeSelector string slowQuery time.Duration } // NewService return a new query service. func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster bus.Broadcaster, omr observability.MetricsRegistry, ) (run.Unit, error) { svc := &queryService{ metaService: metaService, closer: run.NewCloser(1), pipeline: pipeline, omr: omr, } svc.sqp = &streamQueryProcessor{ queryService: svc, broadcaster: broadcaster, } svc.mqp = &measureQueryProcessor{ queryService: svc, broadcaster: broadcaster, } svc.tqp = &topNQueryProcessor{ queryService: svc, broadcaster: broadcaster, } return svc, nil } func (q *queryService) Name() string { return moduleName } func (q *queryService) FlagSet() *run.FlagSet { fs := run.NewFlagSet("distributed-query") fs.DurationVar(&q.slowQuery, "dst-slow-query", 5*time.Second, "distributed slow query threshold, 0 means no slow query log") return fs } func (q *queryService) Validate() error { return nil } func (q *queryService) PreRun(ctx context.Context) error { val := ctx.Value(common.ContextNodeKey) if val == nil { return errors.New("node id is empty") } node := val.(common.Node) q.nodeID = node.NodeID val = ctx.Value(common.ContextNodeSelectorKey) if val != nil { q.hotStageNodeSelector = val.(string) } q.log = logger.GetLogger(moduleName) q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log, schema.NewMetrics(q.omr.With(streamScope))) q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log, schema.NewMetrics(q.omr.With(measureScope))) q.tqp.measureService = q.mqp.measureService return multierr.Combine( q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp), q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp), q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp), ) } func (q *queryService) GracefulStop() { q.sqp.streamService.Close() q.mqp.measureService.Close() q.closer.Done() q.closer.CloseThenWait() } func (q *queryService) Serve() run.StopNotify { return q.closer.CloseNotify() } func (q *queryService) parseNodeSelector(stages []string, resource *commonv1.ResourceOpts) ([]string, bool) { if len(stages) == 0 { stages = resource.DefaultStages } if len(stages) == 0 { return nil, false } var nodeSelectors []string for _, sn := range stages { for _, stage := range resource.Stages { if strings.EqualFold(sn, stage.Name) { ns := stage.NodeSelector ns = strings.TrimSpace(ns) if ns == "" { continue } nodeSelectors = append(nodeSelectors, ns) break } } if strings.EqualFold(sn, hotStageName) && q.hotStageNodeSelector != "" { nodeSelectors = append(nodeSelectors, q.hotStageNodeSelector) } } if len(nodeSelectors) == 0 { return nil, false } return nodeSelectors, true } var _ executor.DistributedExecutionContext = (*distributedContext)(nil) type distributedContext struct { bus.Broadcaster timeRange *modelv1.TimeRange nodeSelectors map[string][]string } func (dc *distributedContext) TimeRange() *modelv1.TimeRange { return dc.timeRange } func (dc *distributedContext) NodeSelectors() map[string][]string { return dc.nodeSelectors }