pkg/schema/init.go (159 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package schema
import (
"context"
"fmt"
"time"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var initTimeout = 10 * time.Second
type revisionContext struct {
group int64
measure int64
stream int64
indexRule int64
indexRuleBinding int64
topNAgg int64
}
func (r revisionContext) String() string {
return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, IndexRule: %d, IndexRuleBinding: %d, TopNAgg: %d",
r.group, r.measure, r.stream, r.indexRule, r.indexRuleBinding, r.topNAgg)
}
type revisionContextKey struct{}
var revCtxKey = revisionContextKey{}
func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) {
if kind != schema.KindMeasure && kind != schema.KindStream {
return nil, nil
}
catalog := sr.getCatalog(kind)
ctx := context.Background()
groups, err := sr.metadata.GroupRegistry().ListGroup(ctx)
if err != nil {
logger.Panicf("fails to get the groups: %v", err)
return nil, nil
}
var revCtx revisionContext
groupNames := make([]string, 0, len(groups))
for _, g := range groups {
if g.Catalog != catalog {
continue
}
if g.Metadata.ModRevision > revCtx.group {
revCtx.group = g.Metadata.ModRevision
}
sr.processGroup(context.WithValue(ctx, revCtxKey, &revCtx), g, catalog)
groupNames = append(groupNames, g.Metadata.Name)
}
if kind == schema.KindMeasure {
sr.l.Info().Stringer("revision", revCtx).Msg("init measures")
return groupNames, []int64{revCtx.group, revCtx.measure, revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg}
}
sr.l.Info().Stringer("revision", revCtx).Msg("init stream")
return groupNames, []int64{revCtx.group, revCtx.stream, revCtx.indexRuleBinding, revCtx.indexRule}
}
func (sr *schemaRepo) getCatalog(kind schema.Kind) commonv1.Catalog {
if kind == schema.KindMeasure {
return commonv1.Catalog_CATALOG_MEASURE
}
return commonv1.Catalog_CATALOG_STREAM
}
func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group, catalog commonv1.Catalog) {
_, err := sr.initGroup(g)
if err != nil {
logger.Panicf("fails to init the group: %v", err)
}
sr.processRules(ctx, g.Metadata.GetName())
sr.processBindings(ctx, g.Metadata.GetName())
if catalog == commonv1.Catalog_CATALOG_MEASURE {
sr.processMeasure(ctx, g.Metadata.Name)
return
}
sr.processStream(ctx, g.Metadata.Name)
}
func (sr *schemaRepo) processBindings(ctx context.Context, gName string) {
ctx, cancel := context.WithTimeout(ctx, initTimeout)
defer cancel()
start := time.Now()
ibb, err := sr.metadata.IndexRuleBindingRegistry().ListIndexRuleBinding(ctx, schema.ListOpt{Group: gName})
if err != nil {
logger.Panicf("fails to get the index rule bindings: %v", err)
}
revCtx := ctx.Value(revCtxKey).(*revisionContext)
for _, ib := range ibb {
sr.storeIndexRuleBinding(ib)
if ib.Metadata.ModRevision > revCtx.indexRuleBinding {
revCtx.indexRuleBinding = ib.Metadata.ModRevision
}
}
sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(ibb)).Msg("get index rule bindings")
}
func (sr *schemaRepo) processRules(ctx context.Context, gName string) {
ctx, cancel := context.WithTimeout(ctx, initTimeout)
defer cancel()
start := time.Now()
rr, err := sr.metadata.IndexRuleRegistry().ListIndexRule(ctx, schema.ListOpt{Group: gName})
if err != nil {
logger.Panicf("fails to get the index rules: %v", err)
}
revCtx := ctx.Value(revCtxKey).(*revisionContext)
for _, r := range rr {
sr.storeIndexRule(r)
if r.Metadata.ModRevision > revCtx.indexRule {
revCtx.indexRule = r.Metadata.ModRevision
}
}
sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(rr)).Msg("get index rules")
}
func (sr *schemaRepo) processMeasure(ctx context.Context, gName string) {
ctx, cancel := context.WithTimeout(ctx, initTimeout)
defer cancel()
start := time.Now()
mm, err := sr.metadata.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: gName})
if err != nil {
logger.Panicf("fails to get the measures: %v", err)
return
}
revCtx := ctx.Value(revCtxKey).(*revisionContext)
for _, m := range mm {
if m.Metadata.ModRevision > revCtx.measure {
revCtx.measure = m.Metadata.ModRevision
}
if err := sr.storeResource(m); err != nil {
logger.Panicf("fails to store the measure: %v", err)
}
}
sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(mm)).Msg("store measures")
}
func (sr *schemaRepo) processStream(ctx context.Context, gName string) {
ctx, cancel := context.WithTimeout(ctx, initTimeout)
defer cancel()
start := time.Now()
ss, err := sr.metadata.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: gName})
if err != nil {
logger.Panicf("fails to get the streams: %v", err)
return
}
revCtx := ctx.Value(revCtxKey).(*revisionContext)
for _, s := range ss {
if err := sr.storeResource(s); err != nil {
logger.Panicf("fails to store the stream: %v", err)
}
if s.Metadata.ModRevision > revCtx.stream {
revCtx.stream = s.Metadata.ModRevision
}
}
sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(ss)).Msg("store streams")
}
func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) {
g, ok := sr.getGroup(groupSchema.Metadata.Name)
if ok {
return g, nil
}
sr.l.Info().Str("group", groupSchema.Metadata.Name).Msg("creating a tsdb")
g = sr.createGroup(groupSchema.Metadata.Name)
if err := g.initBySchema(groupSchema); err != nil {
return nil, err
}
return g, nil
}