banyand/liaison/grpc/property.go (367 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, "property",
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package grpc
import (
"context"
"math"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
)
const defaultQueryTimeout = 10 * time.Second
type propertyServer struct {
propertyv1.UnimplementedPropertyServiceServer
schemaRegistry metadata.Repo
pipeline queue.Client
nodeRegistry NodeRegistry
metrics *metrics
}
func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyRequest) (resp *propertyv1.ApplyResponse, err error) {
property := req.Property
if property.Metadata == nil {
return nil, schema.BadRequest("metadata", "metadata should not be nil")
}
if property.Metadata.Group == "" {
return nil, schema.BadRequest("metadata.group", "group should not be nil")
}
if property.Metadata.Name == "" {
return nil, schema.BadRequest("metadata.name", "name should not be empty")
}
if property.Id == "" {
return nil, schema.BadRequest("id", "id should not be empty")
}
if len(property.Tags) == 0 {
return nil, schema.BadRequest("tags", "tags should not be empty")
}
g := req.Property.Metadata.Group
ps.metrics.totalStarted.Inc(1, g, "property", "apply")
start := time.Now()
defer func() {
ps.metrics.totalFinished.Inc(1, g, "property", "apply")
ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "property", "apply")
if err != nil {
ps.metrics.totalErr.Inc(1, g, "property", "apply")
}
}()
var group *commonv1.Group
if group, err = ps.schemaRegistry.GroupRegistry().GetGroup(ctx, g); err != nil {
return nil, errors.Errorf("group %s not found", g)
}
if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
return nil, errors.Errorf("group %s is not allowed to have properties", g)
}
if group.ResourceOpts == nil {
return nil, errors.Errorf("group %s has no resource options", g)
}
if group.ResourceOpts.ShardNum == 0 {
return nil, errors.Errorf("group %s has no shard number", g)
}
var propSchema *databasev1.Property
propSchema, err = ps.schemaRegistry.PropertyRegistry().GetProperty(ctx, &commonv1.Metadata{
Group: g,
Name: property.Metadata.Name,
})
if err != nil {
return nil, err
}
if len(propSchema.Tags) < len(property.Tags) {
return nil, errors.Errorf("property %s tags count mismatch", property.Metadata.Name)
}
for _, tag := range property.Tags {
found := false
for _, ts := range propSchema.Tags {
if ts.Name == tag.Key {
typ := databasev1.TagType(pbv1.MustTagValueToValueType(tag.Value))
if typ != databasev1.TagType_TAG_TYPE_UNSPECIFIED && ts.Type != typ {
return nil, errors.Errorf("property %s tag %s type mismatch", property.Metadata.Name, tag.Key)
}
found = true
}
}
if !found {
return nil, errors.Errorf("property %s tag %s not found", property.Metadata.Name, tag.Key)
}
}
qResp, err := ps.Query(ctx, &propertyv1.QueryRequest{
Groups: []string{g},
Name: property.Metadata.Name,
Ids: []string{property.Id},
})
if err != nil {
return nil, err
}
var prev *propertyv1.Property
if len(qResp.Properties) > 0 {
prev = qResp.Properties[0]
defer func() {
if err == nil {
var ids [][]byte
for _, p := range qResp.Properties {
ids = append(ids, getPropertyID(p))
}
if err = ps.remove(ids); err != nil {
err = multierr.Append(err, errors.New("fail to remove old properties"))
}
}
}()
}
entity := getEntity(property)
id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum)
if err != nil {
return nil, err
}
node, err := ps.nodeRegistry.Locate(g, entity, uint32(id))
if err != nil {
return nil, err
}
if req.Strategy == propertyv1.ApplyRequest_STRATEGY_REPLACE {
return ps.replaceProperty(ctx, start, uint64(id), node, prev, property)
}
return ps.mergeProperty(ctx, start, uint64(id), node, prev, property)
}
func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, shardID uint64, node string,
prev, cur *propertyv1.Property,
) (*propertyv1.ApplyResponse, error) {
if prev == nil {
return ps.replaceProperty(ctx, now, shardID, node, prev, cur)
}
tagCount, err := tagLen(prev)
if err != nil {
return nil, err
}
tags := make([]*modelv1.Tag, 0)
for i := 0; i < int(tagCount); i++ {
t := prev.Tags[i]
tagExisted := false
for _, et := range cur.Tags {
if et.Key == t.Key {
tagExisted = true
break
}
}
if !tagExisted {
tags = append(tags, t)
}
}
cur.Tags = append(cur.Tags, tags...)
return ps.replaceProperty(ctx, now, shardID, node, prev, cur)
}
func tagLen(property *propertyv1.Property) (uint32, error) {
tagsCount := len(property.Tags)
if tagsCount < 0 || uint64(tagsCount) > math.MaxUint32 {
return 0, errors.New("integer overflow: tags count exceeds uint32 range")
}
tagsNum := uint32(tagsCount)
return tagsNum, nil
}
func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, shardID uint64, node string,
prev, cur *propertyv1.Property,
) (*propertyv1.ApplyResponse, error) {
ns := now.UnixNano()
if prev != nil {
cur.Metadata.CreateRevision = prev.Metadata.CreateRevision
} else {
cur.Metadata.CreateRevision = ns
}
cur.Metadata.ModRevision = ns
cur.UpdatedAt = timestamppb.New(now)
f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate, bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, &propertyv1.InternalUpdateRequest{
ShardId: shardID,
Id: getPropertyID(cur),
Property: cur,
}))
if err != nil {
return nil, err
}
if _, err := f.Get(); err != nil {
return nil, err
}
return &propertyv1.ApplyResponse{
Created: prev == nil,
TagsNum: uint32(len(cur.Tags)),
}, nil
}
func (ps *propertyServer) Delete(ctx context.Context, req *propertyv1.DeleteRequest) (resp *propertyv1.DeleteResponse, err error) {
if req.Group == "" {
return nil, schema.BadRequest("group", "group should not be nil")
}
if req.Name == "" {
return nil, schema.BadRequest("name", "name should not be nil")
}
g := req.Group
ps.metrics.totalStarted.Inc(1, g, "property", "delete")
start := time.Now()
defer func() {
ps.metrics.totalFinished.Inc(1, g, "property", "delete")
ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "property", "delete")
if err != nil {
ps.metrics.totalErr.Inc(1, g, "property", "delete")
}
}()
qReq := &propertyv1.QueryRequest{
Groups: []string{g},
Name: req.Name,
}
if len(req.Id) > 0 {
qReq.Ids = []string{req.Id}
}
qResp, err := ps.Query(ctx, qReq)
if err != nil {
return nil, err
}
if len(qResp.Properties) == 0 {
return &propertyv1.DeleteResponse{Deleted: false}, nil
}
var ids [][]byte
for _, p := range qResp.Properties {
ids = append(ids, getPropertyID(p))
}
if err := ps.remove(ids); err != nil {
return nil, err
}
return &propertyv1.DeleteResponse{Deleted: true}, nil
}
func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryRequest) (resp *propertyv1.QueryResponse, err error) {
ps.metrics.totalStarted.Inc(1, "", "property", "query")
start := time.Now()
defer func() {
ps.metrics.totalFinished.Inc(1, "", "property", "query")
ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), "", "property", "query")
if err != nil {
ps.metrics.totalErr.Inc(1, "", "property", "query")
}
}()
if len(req.Groups) == 0 {
return nil, schema.BadRequest("groups", "groups should not be empty")
}
if req.Limit == 0 {
req.Limit = 100
}
var span *query.Span
if req.Trace {
tracer, _ := query.NewTracer(ctx, start.Format(time.RFC3339Nano))
span, _ = tracer.StartSpan(ctx, "property-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
defer func() {
if err != nil {
span.Error(err)
} else {
resp.Trace = tracer.ToProto()
}
span.Stop()
}()
}
for _, gn := range req.Groups {
if g, getGroupErr := ps.schemaRegistry.GroupRegistry().GetGroup(ctx, gn); getGroupErr != nil {
return nil, errors.Errorf("group %s not found", gn)
} else if g.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
return nil, errors.Errorf("group %s is not allowed to have properties", gn)
}
}
ff, err := ps.pipeline.Broadcast(defaultQueryTimeout, data.TopicPropertyQuery, bus.NewMessage(bus.MessageID(start.Unix()), req))
if err != nil {
return nil, err
}
res := make(map[string]*propertyv1.Property)
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
err = multierr.Append(err, getErr)
} else {
d := m.Data()
if d == nil {
continue
}
switch v := d.(type) {
case *propertyv1.InternalQueryResponse:
for _, s := range v.Sources {
var p propertyv1.Property
err = protojson.Unmarshal(s, &p)
if err != nil {
return nil, err
}
entity := getEntity(&p)
cur, ok := res[entity]
if !ok {
res[entity] = &p
continue
}
if cur.Metadata.ModRevision < p.Metadata.ModRevision {
res[entity] = &p
err = ps.remove([][]byte{getPropertyID(cur)})
if err != nil {
return nil, err
}
}
}
if span != nil {
span.AddSubTrace(v.Trace)
}
case *common.Error:
err = multierr.Append(err, errors.New(v.Error()))
}
}
}
if err != nil {
return nil, err
}
if len(res) == 0 {
return &propertyv1.QueryResponse{Properties: nil}, nil
}
properties := make([]*propertyv1.Property, 0, len(res))
for _, p := range res {
if len(req.TagProjection) > 0 {
var tags []*modelv1.Tag
for _, tag := range p.Tags {
for _, tp := range req.TagProjection {
if tp == tag.Key {
tags = append(tags, tag)
break
}
}
}
p.Tags = tags
}
properties = append(properties, p)
if len(properties) >= int(req.Limit) {
break
}
}
return &propertyv1.QueryResponse{Properties: properties}, nil
}
func (ps *propertyServer) remove(ids [][]byte) error {
ff, err := ps.pipeline.Broadcast(defaultQueryTimeout, data.TopicPropertyDelete, bus.NewMessage(bus.MessageID(time.Now().Unix()), &propertyv1.InternalDeleteRequest{
Ids: ids,
}))
if err != nil {
return err
}
for _, f := range ff {
if _, err := f.Get(); err != nil {
return err
}
}
return nil
}
func getPropertyID(prop *propertyv1.Property) []byte {
return convert.StringToBytes(getEntity(prop) + "/" + strconv.FormatInt(prop.Metadata.ModRevision, 10))
}
func getEntity(prop *propertyv1.Property) string {
return strings.Join([]string{prop.Metadata.Group, prop.Metadata.Name, prop.Id}, "/")
}