banyand/query/query.go (71 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package query implement the query module for liaison and other modules to retrieve data. package query import ( "context" "errors" "time" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) type queryService struct { metaService metadata.Repo pipeline queue.Server log *logger.Logger sqp *streamQueryProcessor mqp *measureQueryProcessor tqp *topNQueryProcessor nodeID string slowQuery time.Duration } // NewService return a new query service. func NewService(_ context.Context, streamService stream.Service, measureService measure.Service, metaService metadata.Repo, pipeline queue.Server, ) (run.Unit, error) { svc := &queryService{ metaService: metaService, pipeline: pipeline, } // measure query processor svc.mqp = &measureQueryProcessor{ measureService: measureService, queryService: svc, } // stream query processor svc.sqp = &streamQueryProcessor{ streamService: streamService, queryService: svc, } // topN query processor svc.tqp = &topNQueryProcessor{ measureService: measureService, queryService: svc, } return svc, nil } func (q *queryService) Name() string { return moduleName } func (q *queryService) PreRun(ctx context.Context) error { val := ctx.Value(common.ContextNodeKey) if val == nil { return errors.New("node id is empty") } node := val.(common.Node) q.nodeID = node.NodeID q.log = logger.GetLogger(moduleName) return multierr.Combine( q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp), q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp), q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp), ) } func (q *queryService) FlagSet() *run.FlagSet { fs := run.NewFlagSet("query") fs.DurationVar(&q.slowQuery, "slow-query", 0, "slow query threshold, 0 means no slow query log") return fs } func (q *queryService) Validate() error { return nil }