banyand/stream/index.go (89 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "context" "path" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type elementIndex struct { store index.Store l *logger.Logger location string } func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds int64, metrics *inverted.Metrics) (*elementIndex, error) { ei := &elementIndex{ l: logger.Fetch(ctx, "element_index"), location: path.Join(root, elementIndexFilename), } var err error if ei.store, err = inverted.NewStore(inverted.StoreOpts{ Path: ei.location, Logger: ei.l, BatchWaitSec: flushTimeoutSeconds, Metrics: metrics, }); err != nil { return nil, err } return ei, nil } func (e *elementIndex) Sort(ctx context.Context, sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preloadSize int, ) (index.FieldIterator[*index.DocumentResult], error) { iter, err := e.store.Sort(ctx, sids, fieldKey, order, timeRange, preloadSize) if err != nil { return nil, err } return iter, nil } func (e *elementIndex) Write(docs index.Documents) error { return e.store.Batch(index.Batch{ Documents: docs, }) } func (e *elementIndex) Search(ctx context.Context, seriesList []uint64, filter index.Filter, tr *index.RangeOpts) (posting.List, posting.List, error) { var result, resultTS posting.List for i, id := range seriesList { select { case <-ctx.Done(): return nil, nil, errors.WithMessagef(ctx.Err(), "search series %d/%d", i, len(seriesList)) default: } pl, plTS, err := filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { return e.store, nil }, common.SeriesID(id), tr) if err != nil { return nil, nil, err } if pl == nil || pl.IsEmpty() { continue } if result == nil { result = pl } else { if err := result.Union(pl); err != nil { return nil, nil, err } } if resultTS == nil { resultTS = plTS } else { if err := resultTS.Union(plTS); err != nil { return nil, nil, err } } } return result, resultTS, nil } func (e *elementIndex) Close() error { return e.store.Close() } func (e *elementIndex) collectMetrics(labelValues ...string) { e.store.CollectMetrics(labelValues...) }