banyand/liaison/grpc/registry.go (881 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package grpc
import (
"context"
"errors"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
)
type streamRegistryServer struct {
databasev1.UnimplementedStreamRegistryServiceServer
schemaRegistry metadata.Repo
metrics *metrics
}
func (rs *streamRegistryServer) Create(ctx context.Context,
req *databasev1.StreamRegistryServiceCreateRequest,
) (*databasev1.StreamRegistryServiceCreateResponse, error) {
g := req.Stream.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "create")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "create")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", "create")
}()
modRevision, err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "stream", "create")
return nil, err
}
return &databasev1.StreamRegistryServiceCreateResponse{
ModRevision: modRevision,
}, nil
}
func (rs *streamRegistryServer) Update(ctx context.Context,
req *databasev1.StreamRegistryServiceUpdateRequest,
) (*databasev1.StreamRegistryServiceUpdateResponse, error) {
g := req.Stream.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "update")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "update")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", "update")
}()
modRevision, err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "stream", "update")
return nil, err
}
return &databasev1.StreamRegistryServiceUpdateResponse{
ModRevision: modRevision,
}, nil
}
func (rs *streamRegistryServer) Delete(ctx context.Context,
req *databasev1.StreamRegistryServiceDeleteRequest,
) (*databasev1.StreamRegistryServiceDeleteResponse, error) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "delete")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "delete")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", "delete")
}()
ok, err := rs.schemaRegistry.StreamRegistry().DeleteStream(ctx, req.GetMetadata())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "stream", "delete")
return nil, err
}
return &databasev1.StreamRegistryServiceDeleteResponse{
Deleted: ok,
}, nil
}
func (rs *streamRegistryServer) Get(ctx context.Context,
req *databasev1.StreamRegistryServiceGetRequest,
) (*databasev1.StreamRegistryServiceGetResponse, error) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "get")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "get")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", "get")
}()
entity, err := rs.schemaRegistry.StreamRegistry().GetStream(ctx, req.GetMetadata())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "stream", "get")
return nil, err
}
return &databasev1.StreamRegistryServiceGetResponse{
Stream: entity,
}, nil
}
func (rs *streamRegistryServer) List(ctx context.Context,
req *databasev1.StreamRegistryServiceListRequest,
) (*databasev1.StreamRegistryServiceListResponse, error) {
g := req.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "list")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "list")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", "list")
}()
entities, err := rs.schemaRegistry.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: req.GetGroup()})
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "stream", "list")
return nil, err
}
return &databasev1.StreamRegistryServiceListResponse{
Stream: entities,
}, nil
}
func (rs *streamRegistryServer) Exist(ctx context.Context, req *databasev1.StreamRegistryServiceExistRequest) (*databasev1.StreamRegistryServiceExistResponse, error) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "exist")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "exist")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", "exist")
}()
_, err := rs.Get(ctx, &databasev1.StreamRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.StreamRegistryServiceExistResponse{
HasGroup: true,
HasStream: true,
}, nil
}
exist, errGroup := groupExist(ctx, err, req.Metadata, rs.schemaRegistry.GroupRegistry())
if errGroup != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "stream", "exist")
return nil, errGroup
}
return &databasev1.StreamRegistryServiceExistResponse{HasGroup: exist, HasStream: false}, nil
}
func groupExist(ctx context.Context, errResource error, metadata *commonv1.Metadata, groupRegistry schema.Group) (bool, error) {
if !errors.Is(errResource, schema.ErrGRPCResourceNotFound) {
return false, errResource
}
if metadata == nil {
return false, status.Error(codes.InvalidArgument, "metadata is absent")
}
_, errGroup := groupRegistry.GetGroup(ctx, metadata.Group)
if errGroup == nil {
return true, nil
}
if errors.Is(errGroup, schema.ErrGRPCResourceNotFound) {
return false, nil
}
return false, errGroup
}
type indexRuleBindingRegistryServer struct {
databasev1.UnimplementedIndexRuleBindingRegistryServiceServer
schemaRegistry metadata.Repo
metrics *metrics
}
func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceCreateRequest) (
*databasev1.IndexRuleBindingRegistryServiceCreateResponse, error,
) {
g := req.IndexRuleBinding.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "create")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", "create")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRuleBinding", "create")
}()
if err := rs.schemaRegistry.IndexRuleBindingRegistry().CreateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", "create")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceCreateResponse{}, nil
}
func (rs *indexRuleBindingRegistryServer) Update(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceUpdateRequest) (
*databasev1.IndexRuleBindingRegistryServiceUpdateResponse, error,
) {
g := req.IndexRuleBinding.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "update")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", "update")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRuleBinding", "update")
}()
if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", "update")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceUpdateResponse{}, nil
}
func (rs *indexRuleBindingRegistryServer) Delete(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceDeleteRequest) (
*databasev1.IndexRuleBindingRegistryServiceDeleteResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "delete")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", "delete")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRuleBinding", "delete")
}()
ok, err := rs.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx, req.GetMetadata())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", "delete")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceDeleteResponse{
Deleted: ok,
}, nil
}
func (rs *indexRuleBindingRegistryServer) Get(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceGetRequest) (
*databasev1.IndexRuleBindingRegistryServiceGetResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "get")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", "get")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRuleBinding", "get")
}()
entity, err := rs.schemaRegistry.IndexRuleBindingRegistry().GetIndexRuleBinding(ctx, req.GetMetadata())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", "get")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceGetResponse{
IndexRuleBinding: entity,
}, nil
}
func (rs *indexRuleBindingRegistryServer) List(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceListRequest) (
*databasev1.IndexRuleBindingRegistryServiceListResponse, error,
) {
g := req.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "list")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", "list")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRuleBinding", "list")
}()
entities, err := rs.schemaRegistry.IndexRuleBindingRegistry().
ListIndexRuleBinding(ctx, schema.ListOpt{Group: req.GetGroup()})
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", "list")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceListResponse{
IndexRuleBinding: entities,
}, nil
}
func (rs *indexRuleBindingRegistryServer) Exist(ctx context.Context, req *databasev1.IndexRuleBindingRegistryServiceExistRequest) (
*databasev1.IndexRuleBindingRegistryServiceExistResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "exist")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", "exist")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRuleBinding", "exist")
}()
_, err := rs.Get(ctx, &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.IndexRuleBindingRegistryServiceExistResponse{
HasGroup: true,
HasIndexRuleBinding: true,
}, nil
}
exist, errGroup := groupExist(ctx, err, req.Metadata, rs.schemaRegistry.GroupRegistry())
if errGroup != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", "exist")
return nil, errGroup
}
return &databasev1.IndexRuleBindingRegistryServiceExistResponse{HasGroup: exist, HasIndexRuleBinding: false}, nil
}
type indexRuleRegistryServer struct {
databasev1.UnimplementedIndexRuleRegistryServiceServer
schemaRegistry metadata.Repo
metrics *metrics
}
func (rs *indexRuleRegistryServer) Create(ctx context.Context,
req *databasev1.IndexRuleRegistryServiceCreateRequest) (
*databasev1.IndexRuleRegistryServiceCreateResponse, error,
) {
g := req.IndexRule.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "create")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "create")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRule", "create")
}()
if err := rs.schemaRegistry.IndexRuleRegistry().CreateIndexRule(ctx, req.GetIndexRule()); err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "create")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceCreateResponse{}, nil
}
func (rs *indexRuleRegistryServer) Update(ctx context.Context,
req *databasev1.IndexRuleRegistryServiceUpdateRequest) (
*databasev1.IndexRuleRegistryServiceUpdateResponse, error,
) {
g := req.IndexRule.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "update")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "update")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRule", "update")
}()
if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule()); err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "update")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceUpdateResponse{}, nil
}
func (rs *indexRuleRegistryServer) Delete(ctx context.Context,
req *databasev1.IndexRuleRegistryServiceDeleteRequest) (
*databasev1.IndexRuleRegistryServiceDeleteResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "delete")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "delete")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRule", "delete")
}()
ok, err := rs.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx, req.GetMetadata())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "delete")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceDeleteResponse{
Deleted: ok,
}, nil
}
func (rs *indexRuleRegistryServer) Get(ctx context.Context,
req *databasev1.IndexRuleRegistryServiceGetRequest) (
*databasev1.IndexRuleRegistryServiceGetResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "get")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "get")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRule", "get")
}()
entity, err := rs.schemaRegistry.IndexRuleRegistry().GetIndexRule(ctx, req.GetMetadata())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "get")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceGetResponse{
IndexRule: entity,
}, nil
}
func (rs *indexRuleRegistryServer) List(ctx context.Context,
req *databasev1.IndexRuleRegistryServiceListRequest) (
*databasev1.IndexRuleRegistryServiceListResponse, error,
) {
g := req.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "list")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "list")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRule", "list")
}()
entities, err := rs.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, schema.ListOpt{Group: req.GetGroup()})
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "list")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceListResponse{
IndexRule: entities,
}, nil
}
func (rs *indexRuleRegistryServer) Exist(ctx context.Context, req *databasev1.IndexRuleRegistryServiceExistRequest) (
*databasev1.IndexRuleRegistryServiceExistResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "exist")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "exist")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "indexRule", "exist")
}()
_, err := rs.Get(ctx, &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.IndexRuleRegistryServiceExistResponse{
HasGroup: true,
HasIndexRule: true,
}, nil
}
exist, errGroup := groupExist(ctx, err, req.Metadata, rs.schemaRegistry.GroupRegistry())
if errGroup != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "exist")
return nil, errGroup
}
return &databasev1.IndexRuleRegistryServiceExistResponse{HasGroup: exist, HasIndexRule: false}, nil
}
type measureRegistryServer struct {
databasev1.UnimplementedMeasureRegistryServiceServer
schemaRegistry metadata.Repo
metrics *metrics
}
func (rs *measureRegistryServer) Create(ctx context.Context,
req *databasev1.MeasureRegistryServiceCreateRequest) (
*databasev1.MeasureRegistryServiceCreateResponse, error,
) {
g := req.Measure.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "create")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "create")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", "create")
}()
modRevision, err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "measure", "create")
return nil, err
}
return &databasev1.MeasureRegistryServiceCreateResponse{
ModRevision: modRevision,
}, nil
}
func (rs *measureRegistryServer) Update(ctx context.Context,
req *databasev1.MeasureRegistryServiceUpdateRequest) (
*databasev1.MeasureRegistryServiceUpdateResponse, error,
) {
g := req.Measure.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "update")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "update")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", "update")
}()
modRevision, err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "measure", "update")
return nil, err
}
return &databasev1.MeasureRegistryServiceUpdateResponse{
ModRevision: modRevision,
}, nil
}
func (rs *measureRegistryServer) Delete(ctx context.Context,
req *databasev1.MeasureRegistryServiceDeleteRequest) (
*databasev1.MeasureRegistryServiceDeleteResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "delete")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "delete")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", "delete")
}()
ok, err := rs.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx, req.GetMetadata())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "measure", "delete")
return nil, err
}
return &databasev1.MeasureRegistryServiceDeleteResponse{
Deleted: ok,
}, nil
}
func (rs *measureRegistryServer) Get(ctx context.Context,
req *databasev1.MeasureRegistryServiceGetRequest) (
*databasev1.MeasureRegistryServiceGetResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "get")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "get")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", "get")
}()
entity, err := rs.schemaRegistry.MeasureRegistry().GetMeasure(ctx, req.GetMetadata())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "measure", "get")
return nil, err
}
return &databasev1.MeasureRegistryServiceGetResponse{
Measure: entity,
}, nil
}
func (rs *measureRegistryServer) List(ctx context.Context,
req *databasev1.MeasureRegistryServiceListRequest) (
*databasev1.MeasureRegistryServiceListResponse, error,
) {
g := req.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "list")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "list")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", "list")
}()
entities, err := rs.schemaRegistry.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: req.GetGroup()})
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "measure", "list")
return nil, err
}
return &databasev1.MeasureRegistryServiceListResponse{
Measure: entities,
}, nil
}
func (rs *measureRegistryServer) Exist(ctx context.Context,
req *databasev1.MeasureRegistryServiceExistRequest) (
*databasev1.MeasureRegistryServiceExistResponse, error,
) {
g := req.Metadata.Group
rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "exist")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "exist")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", "exist")
}()
_, err := rs.Get(ctx, &databasev1.MeasureRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.MeasureRegistryServiceExistResponse{
HasGroup: true,
HasMeasure: true,
}, nil
}
exist, errGroup := groupExist(ctx, err, req.Metadata, rs.schemaRegistry.GroupRegistry())
if errGroup != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "measure", "exist")
return nil, errGroup
}
return &databasev1.MeasureRegistryServiceExistResponse{HasGroup: exist, HasMeasure: false}, nil
}
type groupRegistryServer struct {
databasev1.UnimplementedGroupRegistryServiceServer
schemaRegistry metadata.Repo
metrics *metrics
}
func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.GroupRegistryServiceCreateRequest) (
*databasev1.GroupRegistryServiceCreateResponse, error,
) {
g := ""
rs.metrics.totalRegistryStarted.Inc(1, g, "group", "create")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "group", "create")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", "create")
}()
if err := rs.schemaRegistry.GroupRegistry().CreateGroup(ctx, req.GetGroup()); err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "group", "create")
return nil, err
}
return &databasev1.GroupRegistryServiceCreateResponse{}, nil
}
func (rs *groupRegistryServer) Update(ctx context.Context, req *databasev1.GroupRegistryServiceUpdateRequest) (
*databasev1.GroupRegistryServiceUpdateResponse, error,
) {
g := ""
rs.metrics.totalRegistryStarted.Inc(1, g, "group", "update")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "group", "update")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", "update")
}()
if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup()); err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "group", "update")
return nil, err
}
return &databasev1.GroupRegistryServiceUpdateResponse{}, nil
}
func (rs *groupRegistryServer) Delete(ctx context.Context, req *databasev1.GroupRegistryServiceDeleteRequest) (
*databasev1.GroupRegistryServiceDeleteResponse, error,
) {
g := ""
rs.metrics.totalRegistryStarted.Inc(1, g, "group", "delete")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "group", "delete")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", "delete")
}()
deleted, err := rs.schemaRegistry.GroupRegistry().DeleteGroup(ctx, req.GetGroup())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
return nil, err
}
return &databasev1.GroupRegistryServiceDeleteResponse{
Deleted: deleted,
}, nil
}
func (rs *groupRegistryServer) Get(ctx context.Context, req *databasev1.GroupRegistryServiceGetRequest) (
*databasev1.GroupRegistryServiceGetResponse, error,
) {
g := ""
rs.metrics.totalRegistryStarted.Inc(1, g, "group", "get")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "group", "get")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", "get")
}()
group, err := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, req.GetGroup())
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "group", "get")
return nil, err
}
return &databasev1.GroupRegistryServiceGetResponse{
Group: group,
}, nil
}
func (rs *groupRegistryServer) List(ctx context.Context, _ *databasev1.GroupRegistryServiceListRequest) (
*databasev1.GroupRegistryServiceListResponse, error,
) {
g := ""
rs.metrics.totalRegistryStarted.Inc(1, g, "group", "list")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "group", "list")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", "list")
}()
groups, err := rs.schemaRegistry.GroupRegistry().ListGroup(ctx)
if err != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "group", "list")
return nil, err
}
return &databasev1.GroupRegistryServiceListResponse{
Group: groups,
}, nil
}
func (rs *groupRegistryServer) Exist(ctx context.Context, req *databasev1.GroupRegistryServiceExistRequest) (
*databasev1.GroupRegistryServiceExistResponse, error,
) {
g := ""
rs.metrics.totalRegistryStarted.Inc(1, g, "group", "exist")
start := time.Now()
defer func() {
rs.metrics.totalRegistryFinished.Inc(1, g, "group", "exist")
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", "exist")
}()
_, err := rs.Get(ctx, &databasev1.GroupRegistryServiceGetRequest{Group: req.Group})
if err == nil {
return &databasev1.GroupRegistryServiceExistResponse{
HasGroup: true,
}, nil
}
if errors.Is(err, schema.ErrGRPCResourceNotFound) {
return &databasev1.GroupRegistryServiceExistResponse{
HasGroup: false,
}, nil
}
rs.metrics.totalRegistryErr.Inc(1, g, "group", "exist")
return nil, err
}
type topNAggregationRegistryServer struct {
databasev1.UnimplementedTopNAggregationRegistryServiceServer
schemaRegistry metadata.Repo
metrics *metrics
}
func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceCreateRequest,
) (*databasev1.TopNAggregationRegistryServiceCreateResponse, error) {
g := req.TopNAggregation.Metadata.Group
ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "create")
start := time.Now()
defer func() {
ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", "create")
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "topn_aggregation", "create")
}()
if err := ts.schemaRegistry.TopNAggregationRegistry().CreateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", "create")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceCreateResponse{}, nil
}
func (ts *topNAggregationRegistryServer) Update(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceUpdateRequest,
) (*databasev1.TopNAggregationRegistryServiceUpdateResponse, error) {
g := req.TopNAggregation.Metadata.Group
ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "update")
start := time.Now()
defer func() {
ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", "update")
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "topn_aggregation", "update")
}()
if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", "update")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceUpdateResponse{}, nil
}
func (ts *topNAggregationRegistryServer) Delete(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceDeleteRequest,
) (*databasev1.TopNAggregationRegistryServiceDeleteResponse, error) {
g := req.Metadata.Group
ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "delete")
start := time.Now()
defer func() {
ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", "delete")
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "topn_aggregation", "delete")
}()
ok, err := ts.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx, req.GetMetadata())
if err != nil {
ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", "delete")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceDeleteResponse{
Deleted: ok,
}, nil
}
func (ts *topNAggregationRegistryServer) Get(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceGetRequest,
) (*databasev1.TopNAggregationRegistryServiceGetResponse, error) {
g := req.Metadata.Group
ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "get")
start := time.Now()
defer func() {
ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", "get")
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "topn_aggregation", "get")
}()
entity, err := ts.schemaRegistry.TopNAggregationRegistry().GetTopNAggregation(ctx, req.GetMetadata())
if err != nil {
ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", "get")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceGetResponse{
TopNAggregation: entity,
}, nil
}
func (ts *topNAggregationRegistryServer) List(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceListRequest,
) (*databasev1.TopNAggregationRegistryServiceListResponse, error) {
g := req.Group
ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "list")
start := time.Now()
defer func() {
ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", "list")
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "topn_aggregation", "list")
}()
entities, err := ts.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, schema.ListOpt{Group: req.GetGroup()})
if err != nil {
ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", "list")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceListResponse{
TopNAggregation: entities,
}, nil
}
func (ts *topNAggregationRegistryServer) Exist(ctx context.Context, req *databasev1.TopNAggregationRegistryServiceExistRequest) (
*databasev1.TopNAggregationRegistryServiceExistResponse, error,
) {
g := req.Metadata.Group
ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "exist")
start := time.Now()
defer func() {
ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", "exist")
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "topn_aggregation", "exist")
}()
_, err := ts.Get(ctx, &databasev1.TopNAggregationRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.TopNAggregationRegistryServiceExistResponse{
HasGroup: true,
HasTopNAggregation: true,
}, nil
}
exist, errGroup := groupExist(ctx, err, req.Metadata, ts.schemaRegistry.GroupRegistry())
if errGroup != nil {
ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", "exist")
return nil, errGroup
}
return &databasev1.TopNAggregationRegistryServiceExistResponse{HasGroup: exist, HasTopNAggregation: false}, nil
}
type propertyRegistryServer struct {
databasev1.UnimplementedPropertyRegistryServiceServer
schemaRegistry metadata.Repo
metrics *metrics
}
func (ps *propertyRegistryServer) Create(ctx context.Context, req *databasev1.PropertyRegistryServiceCreateRequest) (
*databasev1.PropertyRegistryServiceCreateResponse, error,
) {
g := req.Property.Metadata.Group
ps.metrics.totalRegistryStarted.Inc(1, g, "property", "create")
start := time.Now()
defer func() {
ps.metrics.totalRegistryFinished.Inc(1, g, "property", "create")
ps.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "property", "create")
}()
if err := ps.schemaRegistry.PropertyRegistry().CreateProperty(ctx, req.GetProperty()); err != nil {
ps.metrics.totalRegistryErr.Inc(1, g, "property", "create")
return nil, err
}
return &databasev1.PropertyRegistryServiceCreateResponse{}, nil
}
func (ps *propertyRegistryServer) Update(ctx context.Context, req *databasev1.PropertyRegistryServiceUpdateRequest) (
*databasev1.PropertyRegistryServiceUpdateResponse, error,
) {
g := req.Property.Metadata.Group
ps.metrics.totalRegistryStarted.Inc(1, g, "property", "update")
start := time.Now()
defer func() {
ps.metrics.totalRegistryFinished.Inc(1, g, "property", "update")
ps.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "property", "update")
}()
if err := ps.schemaRegistry.PropertyRegistry().UpdateProperty(ctx, req.GetProperty()); err != nil {
ps.metrics.totalRegistryErr.Inc(1, g, "property", "update")
return nil, err
}
return &databasev1.PropertyRegistryServiceUpdateResponse{}, nil
}
func (ps *propertyRegistryServer) Delete(ctx context.Context, req *databasev1.PropertyRegistryServiceDeleteRequest) (
*databasev1.PropertyRegistryServiceDeleteResponse, error,
) {
g := req.Metadata.Group
ps.metrics.totalRegistryStarted.Inc(1, g, "property", "delete")
start := time.Now()
defer func() {
ps.metrics.totalRegistryFinished.Inc(1, g, "property", "delete")
ps.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "property", "delete")
}()
ok, err := ps.schemaRegistry.PropertyRegistry().DeleteProperty(ctx, req.GetMetadata())
if err != nil {
ps.metrics.totalRegistryErr.Inc(1, g, "property", "delete")
return nil, err
}
return &databasev1.PropertyRegistryServiceDeleteResponse{
Deleted: ok,
}, nil
}
func (ps *propertyRegistryServer) Get(ctx context.Context, req *databasev1.PropertyRegistryServiceGetRequest) (
*databasev1.PropertyRegistryServiceGetResponse, error,
) {
g := req.Metadata.Group
ps.metrics.totalRegistryStarted.Inc(1, g, "property", "get")
start := time.Now()
defer func() {
ps.metrics.totalRegistryFinished.Inc(1, g, "property", "get")
ps.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "property", "get")
}()
entity, err := ps.schemaRegistry.PropertyRegistry().GetProperty(ctx, req.GetMetadata())
if err != nil {
ps.metrics.totalRegistryErr.Inc(1, g, "property", "get")
return nil, err
}
return &databasev1.PropertyRegistryServiceGetResponse{
Property: entity,
}, nil
}
func (ps *propertyRegistryServer) List(ctx context.Context, req *databasev1.PropertyRegistryServiceListRequest) (
*databasev1.PropertyRegistryServiceListResponse, error,
) {
g := req.Group
ps.metrics.totalRegistryStarted.Inc(1, g, "property", "list")
start := time.Now()
defer func() {
ps.metrics.totalRegistryFinished.Inc(1, g, "property", "list")
ps.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "property", "list")
}()
entities, err := ps.schemaRegistry.PropertyRegistry().ListProperty(ctx, schema.ListOpt{Group: req.GetGroup()})
if err != nil {
ps.metrics.totalRegistryErr.Inc(1, g, "property", "list")
return nil, err
}
return &databasev1.PropertyRegistryServiceListResponse{
Properties: entities,
}, nil
}
func (ps *propertyRegistryServer) Exist(ctx context.Context, req *databasev1.PropertyRegistryServiceExistRequest) (
*databasev1.PropertyRegistryServiceExistResponse, error,
) {
g := req.Metadata.Group
ps.metrics.totalRegistryStarted.Inc(1, g, "property", "exist")
start := time.Now()
defer func() {
ps.metrics.totalRegistryFinished.Inc(1, g, "property", "exist")
ps.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "property", "exist")
}()
_, err := ps.Get(ctx, &databasev1.PropertyRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.PropertyRegistryServiceExistResponse{
HasGroup: true,
HasProperty: true,
}, nil
}
exist, errGroup := groupExist(ctx, err, req.Metadata, ps.schemaRegistry.GroupRegistry())
if errGroup != nil {
ps.metrics.totalRegistryErr.Inc(1, g, "property", "exist")
return nil, errGroup
}
return &databasev1.PropertyRegistryServiceExistResponse{HasGroup: exist, HasProperty: false}, nil
}