banyand/property/listener.go (233 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package property import ( "context" "fmt" "path" "path/filepath" "runtime/debug" "sync" "time" "google.golang.org/protobuf/proto" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query" ) var ( _ bus.MessageListener = (*updateListener)(nil) _ bus.MessageListener = (*deleteListener)(nil) _ bus.MessageListener = (*queryListener)(nil) ) type updateListener struct { s *service l *logger.Logger path string maxDiskUsagePercent int } func (h *updateListener) CheckHealth() *common.Error { if h.maxDiskUsagePercent < 1 { return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "property is readonly because \"property-max-disk-usage-percent\" is 0") } diskPercent := observability.GetPathUsedPercent(h.path) if diskPercent < h.maxDiskUsagePercent { return nil } h.l.Warn().Int("maxPercent", h.maxDiskUsagePercent).Int("diskPercent", diskPercent).Msg("disk usage is too high, stop writing") return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "disk usage is too high, stop writing") } func (h *updateListener) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { n := time.Now() now := n.UnixNano() var protoReq proto.Message defer func() { if err := recover(); err != nil { h.s.l.Error().Interface("err", err).RawJSON("req", logger.Proto(protoReq)).Str("stack", string(debug.Stack())).Msg("panic") resp = bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic: %v", err)) } }() d := message.Data().(*propertyv1.InternalUpdateRequest) if d == nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("request is nil")) return } protoReq = d if d.Property == nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("property is nil")) return } if d.Property.Tags == nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("tags is nil")) return } if len(d.Id) == 0 { resp = bus.NewMessage(bus.MessageID(now), common.NewError("id is empty")) return } err := h.s.db.update(ctx, common.ShardID(d.ShardId), d.Id, d.Property) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to update property: %v", err)) return } resp = bus.NewMessage(bus.MessageID(now), &propertyv1.ApplyResponse{ Created: true, TagsNum: uint32(len(d.Property.Tags)), }) return } type deleteListener struct { *bus.UnImplementedHealthyListener s *service } func (h *deleteListener) Rev(_ context.Context, message bus.Message) (resp bus.Message) { n := time.Now() now := n.UnixNano() var protoReq proto.Message defer func() { if err := recover(); err != nil { h.s.l.Error().Interface("err", err).RawJSON("req", logger.Proto(protoReq)).Str("stack", string(debug.Stack())).Msg("panic") resp = bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic: %v", err)) } }() d := message.Data().(*propertyv1.InternalDeleteRequest) if d == nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("request is nil")) return } protoReq = d if len(d.Ids) == 0 { resp = bus.NewMessage(bus.MessageID(now), common.NewError("id is empty")) return } err := h.s.db.delete(d.Ids) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to delete property: %v", err)) return } resp = bus.NewMessage(bus.MessageID(now), &propertyv1.DeleteResponse{ Deleted: true, }) return } type queryListener struct { *bus.UnImplementedHealthyListener s *service } func (h *queryListener) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { n := time.Now() now := n.UnixNano() var protoReq proto.Message defer func() { if err := recover(); err != nil { h.s.l.Error().Interface("err", err).RawJSON("req", logger.Proto(protoReq)).Str("stack", string(debug.Stack())).Msg("panic") resp = bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic: %v", err)) } }() d := message.Data().(*propertyv1.QueryRequest) if d == nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("request is nil")) return } protoReq = d if len(d.Groups) == 0 { resp = bus.NewMessage(bus.MessageID(now), common.NewError("groups is empty")) return } if d.Limit == 0 { resp = bus.NewMessage(bus.MessageID(now), common.NewError("limit is 0")) return } var tracer *query.Tracer var span *query.Span if d.Trace { tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano)) span, ctx = tracer.StartSpan(ctx, "data-%s", h.s.nodeID) span.Tag("req", string(logger.Proto(protoReq))) defer func() { span.Stop() }() } sources, err := h.s.db.query(ctx, d) if err != nil { if tracer != nil { span.Error(err) resp = bus.NewMessage(bus.MessageID(now), &propertyv1.InternalQueryResponse{ Trace: tracer.ToProto(), }) return } resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to query property: %v", err)) return } qResp := &propertyv1.InternalQueryResponse{ Sources: sources, } if tracer != nil { qResp.Trace = tracer.ToProto() } resp = bus.NewMessage(bus.MessageID(now), qResp) return } type snapshotListener struct { *bus.UnImplementedHealthyListener s *service snapshotSeq uint64 snapshotMux sync.Mutex } // Rev takes a snapshot of the database. func (s *snapshotListener) Rev(ctx context.Context, message bus.Message) bus.Message { groups := message.Data().([]*databasev1.SnapshotRequest_Group) var toTake bool if len(groups) == 0 { toTake = true } else { for _, g := range groups { if g.Catalog != commonv1.Catalog_CATALOG_PROPERTY { toTake = true break } } } if !toTake { return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil) } s.snapshotMux.Lock() defer s.snapshotMux.Unlock() storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, s.s.lfs) sn := s.snapshotName() shardsRef := s.s.db.sLst.Load() if shardsRef == nil { return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil) } shards := *shardsRef for _, shard := range shards { select { case <-ctx.Done(): return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil) default: } snpDir := path.Join(s.s.snapshotDir, sn, storage.DataDir, filepath.Base(shard.location)) lfs.MkdirPanicIfExist(snpDir, storage.DirPerm) err := shard.store.TakeFileSnapshot(snpDir) if err != nil { s.s.l.Error().Err(err).Str("shard", filepath.Base(shard.location)).Msg("fail to take shard snapshot") return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &databasev1.Snapshot{ Name: sn, Catalog: commonv1.Catalog_CATALOG_PROPERTY, Error: err.Error(), }) } } return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &databasev1.Snapshot{ Name: sn, Catalog: commonv1.Catalog_CATALOG_PROPERTY, }) } func (s *snapshotListener) snapshotName() string { s.snapshotSeq++ return fmt.Sprintf("%s-%08X", time.Now().UTC().Format("20060102150405"), s.snapshotSeq) }