banyand/liaison/grpc/registry.go (441 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package grpc import ( "context" "errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) type streamRegistryServer struct { databasev1.UnimplementedStreamRegistryServiceServer schemaRegistry metadata.Repo } func (rs *streamRegistryServer) Create(ctx context.Context, req *databasev1.StreamRegistryServiceCreateRequest, ) (*databasev1.StreamRegistryServiceCreateResponse, error) { if err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream()); err != nil { return nil, err } return &databasev1.StreamRegistryServiceCreateResponse{}, nil } func (rs *streamRegistryServer) Update(ctx context.Context, req *databasev1.StreamRegistryServiceUpdateRequest, ) (*databasev1.StreamRegistryServiceUpdateResponse, error) { if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil { return nil, err } return &databasev1.StreamRegistryServiceUpdateResponse{}, nil } func (rs *streamRegistryServer) Delete(ctx context.Context, req *databasev1.StreamRegistryServiceDeleteRequest, ) (*databasev1.StreamRegistryServiceDeleteResponse, error) { ok, err := rs.schemaRegistry.StreamRegistry().DeleteStream(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.StreamRegistryServiceDeleteResponse{ Deleted: ok, }, nil } func (rs *streamRegistryServer) Get(ctx context.Context, req *databasev1.StreamRegistryServiceGetRequest, ) (*databasev1.StreamRegistryServiceGetResponse, error) { entity, err := rs.schemaRegistry.StreamRegistry().GetStream(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.StreamRegistryServiceGetResponse{ Stream: entity, }, nil } func (rs *streamRegistryServer) List(ctx context.Context, req *databasev1.StreamRegistryServiceListRequest, ) (*databasev1.StreamRegistryServiceListResponse, error) { entities, err := rs.schemaRegistry.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: req.GetGroup()}) if err != nil { return nil, err } return &databasev1.StreamRegistryServiceListResponse{ Stream: entities, }, nil } func (rs *streamRegistryServer) Exist(ctx context.Context, req *databasev1.StreamRegistryServiceExistRequest) (*databasev1.StreamRegistryServiceExistResponse, error) { _, err := rs.Get(ctx, &databasev1.StreamRegistryServiceGetRequest{Metadata: req.Metadata}) if err == nil { return &databasev1.StreamRegistryServiceExistResponse{ HasGroup: true, HasStream: true, }, nil } exist, errGroup := groupExist(ctx, err, req.Metadata, rs.schemaRegistry.GroupRegistry()) if errGroup != nil { return nil, errGroup } return &databasev1.StreamRegistryServiceExistResponse{HasGroup: exist, HasStream: false}, nil } func groupExist(ctx context.Context, errResource error, metadata *commonv1.Metadata, groupRegistry schema.Group) (bool, error) { if !errors.Is(errResource, schema.ErrGRPCResourceNotFound) { return false, errResource } if metadata == nil { return false, status.Error(codes.InvalidArgument, "metadata is absent") } _, errGroup := groupRegistry.GetGroup(ctx, metadata.Group) if errGroup == nil { return true, nil } if errors.Is(errGroup, schema.ErrGRPCResourceNotFound) { return false, nil } return false, errGroup } type indexRuleBindingRegistryServer struct { databasev1.UnimplementedIndexRuleBindingRegistryServiceServer schemaRegistry metadata.Repo } func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context, req *databasev1.IndexRuleBindingRegistryServiceCreateRequest) ( *databasev1.IndexRuleBindingRegistryServiceCreateResponse, error, ) { if err := rs.schemaRegistry.IndexRuleBindingRegistry().CreateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil { return nil, err } return &databasev1.IndexRuleBindingRegistryServiceCreateResponse{}, nil } func (rs *indexRuleBindingRegistryServer) Update(ctx context.Context, req *databasev1.IndexRuleBindingRegistryServiceUpdateRequest) ( *databasev1.IndexRuleBindingRegistryServiceUpdateResponse, error, ) { if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil { return nil, err } return &databasev1.IndexRuleBindingRegistryServiceUpdateResponse{}, nil } func (rs *indexRuleBindingRegistryServer) Delete(ctx context.Context, req *databasev1.IndexRuleBindingRegistryServiceDeleteRequest) ( *databasev1.IndexRuleBindingRegistryServiceDeleteResponse, error, ) { ok, err := rs.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.IndexRuleBindingRegistryServiceDeleteResponse{ Deleted: ok, }, nil } func (rs *indexRuleBindingRegistryServer) Get(ctx context.Context, req *databasev1.IndexRuleBindingRegistryServiceGetRequest) ( *databasev1.IndexRuleBindingRegistryServiceGetResponse, error, ) { entity, err := rs.schemaRegistry.IndexRuleBindingRegistry().GetIndexRuleBinding(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.IndexRuleBindingRegistryServiceGetResponse{ IndexRuleBinding: entity, }, nil } func (rs *indexRuleBindingRegistryServer) List(ctx context.Context, req *databasev1.IndexRuleBindingRegistryServiceListRequest) ( *databasev1.IndexRuleBindingRegistryServiceListResponse, error, ) { entities, err := rs.schemaRegistry.IndexRuleBindingRegistry(). ListIndexRuleBinding(ctx, schema.ListOpt{Group: req.GetGroup()}) if err != nil { return nil, err } return &databasev1.IndexRuleBindingRegistryServiceListResponse{ IndexRuleBinding: entities, }, nil } func (rs *indexRuleBindingRegistryServer) Exist(ctx context.Context, req *databasev1.IndexRuleBindingRegistryServiceExistRequest) ( *databasev1.IndexRuleBindingRegistryServiceExistResponse, error, ) { _, err := rs.Get(ctx, &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: req.Metadata}) if err == nil { return &databasev1.IndexRuleBindingRegistryServiceExistResponse{ HasGroup: true, HasIndexRuleBinding: true, }, nil } exist, errGroup := groupExist(ctx, err, req.Metadata, rs.schemaRegistry.GroupRegistry()) if errGroup != nil { return nil, errGroup } return &databasev1.IndexRuleBindingRegistryServiceExistResponse{HasGroup: exist, HasIndexRuleBinding: false}, nil } type indexRuleRegistryServer struct { databasev1.UnimplementedIndexRuleRegistryServiceServer schemaRegistry metadata.Repo } func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.IndexRuleRegistryServiceCreateRequest) ( *databasev1.IndexRuleRegistryServiceCreateResponse, error, ) { if err := rs.schemaRegistry.IndexRuleRegistry().CreateIndexRule(ctx, req.GetIndexRule()); err != nil { return nil, err } return &databasev1.IndexRuleRegistryServiceCreateResponse{}, nil } func (rs *indexRuleRegistryServer) Update(ctx context.Context, req *databasev1.IndexRuleRegistryServiceUpdateRequest) ( *databasev1.IndexRuleRegistryServiceUpdateResponse, error, ) { if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule()); err != nil { return nil, err } return &databasev1.IndexRuleRegistryServiceUpdateResponse{}, nil } func (rs *indexRuleRegistryServer) Delete(ctx context.Context, req *databasev1.IndexRuleRegistryServiceDeleteRequest) ( *databasev1.IndexRuleRegistryServiceDeleteResponse, error, ) { ok, err := rs.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.IndexRuleRegistryServiceDeleteResponse{ Deleted: ok, }, nil } func (rs *indexRuleRegistryServer) Get(ctx context.Context, req *databasev1.IndexRuleRegistryServiceGetRequest) ( *databasev1.IndexRuleRegistryServiceGetResponse, error, ) { entity, err := rs.schemaRegistry.IndexRuleRegistry().GetIndexRule(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.IndexRuleRegistryServiceGetResponse{ IndexRule: entity, }, nil } func (rs *indexRuleRegistryServer) List(ctx context.Context, req *databasev1.IndexRuleRegistryServiceListRequest) ( *databasev1.IndexRuleRegistryServiceListResponse, error, ) { entities, err := rs.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, schema.ListOpt{Group: req.GetGroup()}) if err != nil { return nil, err } return &databasev1.IndexRuleRegistryServiceListResponse{ IndexRule: entities, }, nil } func (rs *indexRuleRegistryServer) Exist(ctx context.Context, req *databasev1.IndexRuleRegistryServiceExistRequest) ( *databasev1.IndexRuleRegistryServiceExistResponse, error, ) { _, err := rs.Get(ctx, &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: req.Metadata}) if err == nil { return &databasev1.IndexRuleRegistryServiceExistResponse{ HasGroup: true, HasIndexRule: true, }, nil } exist, errGroup := groupExist(ctx, err, req.Metadata, rs.schemaRegistry.GroupRegistry()) if errGroup != nil { return nil, errGroup } return &databasev1.IndexRuleRegistryServiceExistResponse{HasGroup: exist, HasIndexRule: false}, nil } type measureRegistryServer struct { databasev1.UnimplementedMeasureRegistryServiceServer schemaRegistry metadata.Repo } func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) ( *databasev1.MeasureRegistryServiceCreateResponse, error, ) { if err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure()); err != nil { return nil, err } return &databasev1.MeasureRegistryServiceCreateResponse{}, nil } func (rs *measureRegistryServer) Update(ctx context.Context, req *databasev1.MeasureRegistryServiceUpdateRequest) ( *databasev1.MeasureRegistryServiceUpdateResponse, error, ) { if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil { return nil, err } return &databasev1.MeasureRegistryServiceUpdateResponse{}, nil } func (rs *measureRegistryServer) Delete(ctx context.Context, req *databasev1.MeasureRegistryServiceDeleteRequest) ( *databasev1.MeasureRegistryServiceDeleteResponse, error, ) { ok, err := rs.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.MeasureRegistryServiceDeleteResponse{ Deleted: ok, }, nil } func (rs *measureRegistryServer) Get(ctx context.Context, req *databasev1.MeasureRegistryServiceGetRequest) ( *databasev1.MeasureRegistryServiceGetResponse, error, ) { entity, err := rs.schemaRegistry.MeasureRegistry().GetMeasure(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.MeasureRegistryServiceGetResponse{ Measure: entity, }, nil } func (rs *measureRegistryServer) List(ctx context.Context, req *databasev1.MeasureRegistryServiceListRequest) ( *databasev1.MeasureRegistryServiceListResponse, error, ) { entities, err := rs.schemaRegistry.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: req.GetGroup()}) if err != nil { return nil, err } return &databasev1.MeasureRegistryServiceListResponse{ Measure: entities, }, nil } func (rs *measureRegistryServer) Exist(ctx context.Context, req *databasev1.MeasureRegistryServiceExistRequest) (*databasev1.MeasureRegistryServiceExistResponse, error) { _, err := rs.Get(ctx, &databasev1.MeasureRegistryServiceGetRequest{Metadata: req.Metadata}) if err == nil { return &databasev1.MeasureRegistryServiceExistResponse{ HasGroup: true, HasMeasure: true, }, nil } exist, errGroup := groupExist(ctx, err, req.Metadata, rs.schemaRegistry.GroupRegistry()) if errGroup != nil { return nil, errGroup } return &databasev1.MeasureRegistryServiceExistResponse{HasGroup: exist, HasMeasure: false}, nil } type groupRegistryServer struct { databasev1.UnimplementedGroupRegistryServiceServer schemaRegistry metadata.Repo } func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.GroupRegistryServiceCreateRequest) ( *databasev1.GroupRegistryServiceCreateResponse, error, ) { if err := rs.schemaRegistry.GroupRegistry().CreateGroup(ctx, req.GetGroup()); err != nil { return nil, err } return &databasev1.GroupRegistryServiceCreateResponse{}, nil } func (rs *groupRegistryServer) Update(ctx context.Context, req *databasev1.GroupRegistryServiceUpdateRequest) ( *databasev1.GroupRegistryServiceUpdateResponse, error, ) { if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup()); err != nil { return nil, err } return &databasev1.GroupRegistryServiceUpdateResponse{}, nil } func (rs *groupRegistryServer) Delete(ctx context.Context, req *databasev1.GroupRegistryServiceDeleteRequest) ( *databasev1.GroupRegistryServiceDeleteResponse, error, ) { deleted, err := rs.schemaRegistry.GroupRegistry().DeleteGroup(ctx, req.GetGroup()) if err != nil { return nil, err } return &databasev1.GroupRegistryServiceDeleteResponse{ Deleted: deleted, }, nil } func (rs *groupRegistryServer) Get(ctx context.Context, req *databasev1.GroupRegistryServiceGetRequest) ( *databasev1.GroupRegistryServiceGetResponse, error, ) { g, err := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, req.GetGroup()) if err != nil { return nil, err } return &databasev1.GroupRegistryServiceGetResponse{ Group: g, }, nil } func (rs *groupRegistryServer) List(ctx context.Context, _ *databasev1.GroupRegistryServiceListRequest) ( *databasev1.GroupRegistryServiceListResponse, error, ) { groups, err := rs.schemaRegistry.GroupRegistry().ListGroup(ctx) if err != nil { return nil, err } return &databasev1.GroupRegistryServiceListResponse{ Group: groups, }, nil } func (rs *groupRegistryServer) Exist(ctx context.Context, req *databasev1.GroupRegistryServiceExistRequest) (*databasev1.GroupRegistryServiceExistResponse, error) { _, err := rs.Get(ctx, &databasev1.GroupRegistryServiceGetRequest{Group: req.Group}) if err == nil { return &databasev1.GroupRegistryServiceExistResponse{ HasGroup: true, }, nil } if errors.Is(err, schema.ErrGRPCResourceNotFound) { return &databasev1.GroupRegistryServiceExistResponse{ HasGroup: false, }, nil } return nil, err } type topNAggregationRegistryServer struct { databasev1.UnimplementedTopNAggregationRegistryServiceServer schemaRegistry metadata.Repo } func (ts *topNAggregationRegistryServer) Create(ctx context.Context, req *databasev1.TopNAggregationRegistryServiceCreateRequest, ) (*databasev1.TopNAggregationRegistryServiceCreateResponse, error) { if err := ts.schemaRegistry.TopNAggregationRegistry().CreateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil { return nil, err } return &databasev1.TopNAggregationRegistryServiceCreateResponse{}, nil } func (ts *topNAggregationRegistryServer) Update(ctx context.Context, req *databasev1.TopNAggregationRegistryServiceUpdateRequest, ) (*databasev1.TopNAggregationRegistryServiceUpdateResponse, error) { if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil { return nil, err } return &databasev1.TopNAggregationRegistryServiceUpdateResponse{}, nil } func (ts *topNAggregationRegistryServer) Delete(ctx context.Context, req *databasev1.TopNAggregationRegistryServiceDeleteRequest, ) (*databasev1.TopNAggregationRegistryServiceDeleteResponse, error) { ok, err := ts.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.TopNAggregationRegistryServiceDeleteResponse{ Deleted: ok, }, nil } func (ts *topNAggregationRegistryServer) Get(ctx context.Context, req *databasev1.TopNAggregationRegistryServiceGetRequest, ) (*databasev1.TopNAggregationRegistryServiceGetResponse, error) { entity, err := ts.schemaRegistry.TopNAggregationRegistry().GetTopNAggregation(ctx, req.GetMetadata()) if err != nil { return nil, err } return &databasev1.TopNAggregationRegistryServiceGetResponse{ TopNAggregation: entity, }, nil } func (ts *topNAggregationRegistryServer) List(ctx context.Context, req *databasev1.TopNAggregationRegistryServiceListRequest, ) (*databasev1.TopNAggregationRegistryServiceListResponse, error) { entities, err := ts.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, schema.ListOpt{Group: req.GetGroup()}) if err != nil { return nil, err } return &databasev1.TopNAggregationRegistryServiceListResponse{ TopNAggregation: entities, }, nil } func (ts *topNAggregationRegistryServer) Exist(ctx context.Context, req *databasev1.TopNAggregationRegistryServiceExistRequest) ( *databasev1.TopNAggregationRegistryServiceExistResponse, error, ) { _, err := ts.Get(ctx, &databasev1.TopNAggregationRegistryServiceGetRequest{Metadata: req.Metadata}) if err == nil { return &databasev1.TopNAggregationRegistryServiceExistResponse{ HasGroup: true, HasTopNAggregation: true, }, nil } exist, errGroup := groupExist(ctx, err, req.Metadata, ts.schemaRegistry.GroupRegistry()) if errGroup != nil { return nil, errGroup } return &databasev1.TopNAggregationRegistryServiceExistResponse{HasGroup: exist, HasTopNAggregation: false}, nil }