pkg/node/round_robin.go (198 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package node import ( "context" "encoding/json" "fmt" "slices" "sort" "strings" "sync" "time" "github.com/pkg/errors" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue/pub" "github.com/apache/skywalking-banyandb/pkg/convert" ) type roundRobinSelector struct { schemaRegistry metadata.Repo nodeSelector *pub.LabelSelector name string lookupTable []key nodes []string mu sync.RWMutex } func (r *roundRobinSelector) String() string { r.mu.RLock() defer r.mu.RUnlock() result := make(map[string]string) for _, entry := range r.lookupTable { n, err := r.Pick(entry.group, "", entry.shardID) key := fmt.Sprintf("%s-%d", entry.group, entry.shardID) if err != nil { result[key] = fmt.Sprintf("%v", err) continue } result[key] = n } if len(result) < 1 { return "" } jsonBytes, err := json.Marshal(result) if err != nil { return fmt.Sprintf("%v", err) } return convert.BytesToString(jsonBytes) } // NewRoundRobinSelector creates a new round-robin selector. func NewRoundRobinSelector(name string, schemaRegistry metadata.Repo) Selector { rrs := &roundRobinSelector{ name: name, nodes: make([]string, 0), schemaRegistry: schemaRegistry, lookupTable: make([]key, 0), } return rrs } func (r *roundRobinSelector) Name() string { return r.name } func (r *roundRobinSelector) SetNodeSelector(selector *pub.LabelSelector) { r.nodeSelector = selector } func (r *roundRobinSelector) PreRun(context.Context) error { r.schemaRegistry.RegisterHandler(r.name, schema.KindGroup, r) return nil } func (r *roundRobinSelector) OnAddOrUpdate(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindGroup { return } group, ok := schemaMetadata.Spec.(*commonv1.Group) if !ok || !validateGroup(group) { return } r.mu.Lock() defer r.mu.Unlock() r.removeGroup(group.Metadata.Name) for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ { k := key{group: group.Metadata.Name, shardID: i} r.lookupTable = append(r.lookupTable, k) } r.sortEntries() } func (r *roundRobinSelector) removeGroup(group string) { for i := 0; i < len(r.lookupTable); { if r.lookupTable[i].group == group { copy(r.lookupTable[i:], r.lookupTable[i+1:]) r.lookupTable = r.lookupTable[:len(r.lookupTable)-1] } else { i++ } } } func (r *roundRobinSelector) OnDelete(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindGroup { return } r.mu.Lock() defer r.mu.Unlock() group := schemaMetadata.Spec.(*commonv1.Group) r.removeGroup(group.Metadata.Name) } func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) { if len(kinds) != 1 { return false, nil } if kinds[0] != schema.KindGroup { return false, nil } ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() gg, err := r.schemaRegistry.GroupRegistry().ListGroup(ctx) if err != nil { panic(fmt.Sprintf("failed to list group: %v", err)) } r.mu.Lock() defer r.mu.Unlock() var revision int64 r.lookupTable = r.lookupTable[:0] for _, g := range gg { if !validateGroup(g) { continue } if g.Metadata.ModRevision > revision { revision = g.Metadata.ModRevision } for i := uint32(0); i < g.ResourceOpts.ShardNum; i++ { k := key{group: g.Metadata.Name, shardID: i} r.lookupTable = append(r.lookupTable, k) } } r.sortEntries() return true, []int64{revision} } func (r *roundRobinSelector) AddNode(node *databasev1.Node) { if r.nodeSelector != nil && !r.nodeSelector.Matches(node.Labels) { return } r.mu.Lock() defer r.mu.Unlock() r.nodes = append(r.nodes, node.Metadata.Name) sort.StringSlice(r.nodes).Sort() } func (r *roundRobinSelector) RemoveNode(node *databasev1.Node) { r.mu.Lock() defer r.mu.Unlock() for i, n := range r.nodes { if n == node.Metadata.Name { r.nodes = append(r.nodes[:i], r.nodes[i+1:]...) break } } } func (r *roundRobinSelector) Pick(group, _ string, shardID uint32) (string, error) { r.mu.RLock() defer r.mu.RUnlock() k := key{group: group, shardID: shardID} if len(r.nodes) == 0 { return "", errors.New("no nodes available") } i := sort.Search(len(r.lookupTable), func(i int) bool { if r.lookupTable[i].group == group { return r.lookupTable[i].shardID >= shardID } return r.lookupTable[i].group > group }) if i < len(r.lookupTable) && r.lookupTable[i] == k { return r.selectNode(i), nil } return "", fmt.Errorf("%s-%d is a unknown shard", group, shardID) } func (r *roundRobinSelector) sortEntries() { slices.SortFunc(r.lookupTable, func(a, b key) int { n := strings.Compare(a.group, b.group) if n != 0 { return n } return int(a.shardID) - int(b.shardID) }) } func (r *roundRobinSelector) selectNode(entry any) string { index := entry.(int) return r.nodes[index%len(r.nodes)] } func validateGroup(group *commonv1.Group) bool { if group.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED { return false } if group.ResourceOpts == nil { return false } return true } type key struct { group string shardID uint32 }