banyand/metadata/allocator.go (60 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package metadata
import (
"context"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var _ schema.EventHandler = (*allocator)(nil)
type allocator struct {
schemaRegistry schema.Registry
l *logger.Logger
}
func newAllocator(schemaRegistry schema.Registry, logger *logger.Logger) *allocator {
return &allocator{
schemaRegistry: schemaRegistry,
l: logger,
}
}
// OnAddOrUpdate implements EventHandler.
func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
groupSchema := metadata.Spec.(*commonv1.Group)
if groupSchema.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED {
return
}
now := time.Now()
nowPb := timestamppb.New(now)
shardNum := groupSchema.GetResourceOpts().GetShardNum()
syncShard := func(id uint64) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return a.schemaRegistry.CreateOrUpdateShard(ctx, &databasev1.Shard{
Id: id,
Total: shardNum,
Metadata: &commonv1.Metadata{
Name: groupSchema.GetMetadata().GetName(),
},
Node: &databasev1.Node{
Id: "TODO",
CreatedAt: nowPb,
UpdatedAt: nowPb,
Addr: "TODO",
},
})
}
for i := 0; i < int(shardNum); i++ {
if err := syncShard(uint64(i)); err != nil {
// TODO: handle error. retry? or do a full sync?
a.l.Error().Err(err).Msg("failed to sync shard")
}
}
case schema.KindNode:
// TODO: handle node
default:
return
}
}
// OnDelete implements EventHandler.
func (*allocator) OnDelete(schema.Metadata) {
// TODO: handle delete
}