server/service/disco/schema.go (306 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package disco import ( "context" "errors" "fmt" "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/datasource/schema" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" quotasvc "github.com/apache/servicecomb-service-center/server/service/quota" "github.com/apache/servicecomb-service-center/server/service/validator" mapset "github.com/deckarep/golang-set" pb "github.com/go-chassis/cari/discovery" ) // ExistSchema only return the summary without content if schema exist func ExistSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, error) { remoteIP := util.GetIPFromContext(ctx) serviceID := request.ServiceId schemaID := request.SchemaId if checkErr := validator.ValidateGetSchema(request); checkErr != nil { log.Error(fmt.Sprintf("invalid get service[%s] schema[%s] request, operator: %s", serviceID, schemaID, remoteIP), nil) return nil, pb.NewError(pb.ErrInvalidParams, checkErr.Error()) } ref, err := schema.Instance().GetRef(ctx, &schema.RefRequest{ ServiceID: serviceID, SchemaID: schemaID, }) if err != nil { if errors.Is(err, schema.ErrSchemaNotFound) { return existOldSchema(ctx, request) } log.Error(fmt.Sprintf("get service[%s] schema-ref[%s] failed, operator: %s", serviceID, schemaID, remoteIP), nil) return nil, err } return &pb.Schema{ SchemaId: schemaID, Summary: ref.Summary, }, nil } func existOldSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, error) { resp, err := datasource.GetMetadataManager().ExistSchema(ctx, &pb.GetExistenceRequest{ Type: datasource.ExistTypeSchema, ServiceId: request.ServiceId, SchemaId: request.SchemaId, }) if err != nil { return nil, err } return &pb.Schema{ SchemaId: request.SchemaId, Summary: resp.Summary, }, nil } func GetSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, error) { remoteIP := util.GetIPFromContext(ctx) serviceID := request.ServiceId schemaID := request.SchemaId if checkErr := validator.ValidateGetSchema(request); checkErr != nil { log.Error(fmt.Sprintf("invalid get service[%s] schema[%s] request, operator: %s", serviceID, schemaID, remoteIP), nil) return nil, pb.NewError(pb.ErrInvalidParams, checkErr.Error()) } ref, err := schema.Instance().GetRef(ctx, &schema.RefRequest{ ServiceID: serviceID, SchemaID: schemaID, }) if err != nil { if errors.Is(err, schema.ErrSchemaNotFound) { return getOldSchema(ctx, request) } log.Error(fmt.Sprintf("get service[%s] schema-ref[%s] failed, operator: %s", serviceID, schemaID, remoteIP), nil) return nil, err } content, err := schema.Instance().GetContent(ctx, &schema.ContentRequest{ Hash: ref.Hash, }) if err != nil { log.Error(fmt.Sprintf("get service[%s] schema[%s] failed, operator: %s", serviceID, schemaID, remoteIP), nil) return nil, err } return &pb.Schema{ SchemaId: schemaID, Schema: content.Content, Summary: ref.Summary, }, nil } func getOldSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.Schema, error) { resp, err := datasource.GetMetadataManager().GetSchema(ctx, request) if err != nil { return nil, err } return &pb.Schema{ SchemaId: request.SchemaId, Schema: resp.Schema, Summary: resp.SchemaSummary, }, nil } func ListSchema(ctx context.Context, request *pb.GetAllSchemaRequest) ([]*pb.Schema, error) { remoteIP := util.GetIPFromContext(ctx) serviceID := request.ServiceId if checkErr := validator.ValidateListSchema(request); checkErr != nil { log.Error(fmt.Sprintf("invalid list service[%s] schemas request, operator: %s", serviceID, remoteIP), nil) return nil, pb.NewError(pb.ErrInvalidParams, checkErr.Error()) } schemaIDs, err := getOldSchemaIDs(ctx, serviceID) if err != nil { log.Error(fmt.Sprintf("list service[%s] schemaIDs failed, operator: %s", serviceID, remoteIP), nil) return nil, err } requests, err := mergeRequests(ctx, serviceID, schemaIDs) if err != nil { log.Error(fmt.Sprintf("list service[%s] schema-refs failed, operator: %s", serviceID, remoteIP), nil) return nil, err } schemas := make([]*pb.Schema, 0, len(requests)) for _, req := range requests { tmp, err := getSchema(ctx, req, request.WithSchema) if err != nil && !errors.Is(err, schema.ErrSchemaNotFound) { return nil, err } item := &pb.Schema{ SchemaId: req.SchemaId, } if tmp != nil { item = tmp } schemas = append(schemas, item) } return schemas, nil } func getSchema(ctx context.Context, req *pb.GetSchemaRequest, withSchema bool) (*pb.Schema, error) { if withSchema { return GetSchema(ctx, req) } return ExistSchema(ctx, req) } func getOldSchemaIDs(ctx context.Context, serviceID string) ([]string, error) { resp, err := datasource.GetMetadataManager().GetAllSchemas(ctx, &pb.GetAllSchemaRequest{ ServiceId: serviceID, WithSchema: false, }) if err != nil { return nil, err } schemaIDs := make([]string, 0, len(resp.Schemas)) for _, item := range resp.Schemas { schemaIDs = append(schemaIDs, item.SchemaId) } return schemaIDs, nil } func mergeRequests(ctx context.Context, serviceID string, oldSchemaIDs []string) ([]*pb.GetSchemaRequest, error) { refs, err := schema.Instance().ListRef(ctx, &schema.RefRequest{ ServiceID: serviceID, }) if err != nil { return nil, err } set := mapset.NewSet() for _, schemaID := range oldSchemaIDs { set.Add(schemaID) } for _, ref := range refs { set.Add(ref.SchemaID) } var requests []*pb.GetSchemaRequest for item := range set.Iter() { requests = append(requests, &pb.GetSchemaRequest{ ServiceId: serviceID, SchemaId: item.(string), }) } return requests, nil } func DeleteSchema(ctx context.Context, request *pb.DeleteSchemaRequest) error { remoteIP := util.GetIPFromContext(ctx) if checkErr := validator.ValidateDeleteSchema(request); checkErr != nil { log.Error(fmt.Sprintf("invalid delete service[%s] schema[%s] request, operator: %s", request.ServiceId, request.SchemaId, remoteIP), checkErr) return pb.NewError(pb.ErrInvalidParams, checkErr.Error()) } _, svcErr := datasource.GetMetadataManager().GetService(ctx, &pb.GetServiceRequest{ ServiceId: request.ServiceId, }) if svcErr != nil { log.Error(fmt.Sprintf("get service[%s] failed, operator: %s", request.ServiceId, remoteIP), svcErr) return svcErr } err := schema.Instance().DeleteRef(ctx, &schema.RefRequest{ ServiceID: request.ServiceId, SchemaID: request.SchemaId, }) if err != nil { if errors.Is(err, schema.ErrSchemaNotFound) { return deleteOldSchema(ctx, request) } log.Error(fmt.Sprintf("delete service[%s] schema[%s] failed, operator: %s", request.ServiceId, request.SchemaId, remoteIP), err) return err } log.Info(fmt.Sprintf("delete service[%s] schema[%s], operator: %s", request.ServiceId, request.SchemaId, remoteIP)) err = deleteOldSchema(ctx, request) if err != nil && !errors.Is(err, schema.ErrSchemaNotFound) { log.Error(fmt.Sprintf("delete old service[%s] schema[%s] failed, operator: %s", request.ServiceId, request.SchemaId, remoteIP), svcErr) return err } return nil } func deleteOldSchema(ctx context.Context, request *pb.DeleteSchemaRequest) error { return datasource.GetMetadataManager().DeleteSchema(ctx, request) } // PutSchemas covers all the schemas of a service. // To cover the old schemas, ModifySchemas adds new schemas into, delete and // modify the old schemas. func PutSchemas(ctx context.Context, request *pb.ModifySchemasRequest) error { remoteIP := util.GetIPFromContext(ctx) serviceID := request.ServiceId if checkErr := validator.ValidatePutSchemas(request); checkErr != nil { log.Error(fmt.Sprintf("invalid modify service[%s] schemas request, operator: %s", serviceID, remoteIP), checkErr) return pb.NewError(pb.ErrInvalidParams, "Invalid request.") } // no need to check quota usage because overwrite existing. apply := len(request.Schemas) schemaIDs := make([]string, 0, apply) contentItems := make([]*schema.ContentItem, 0, apply) for _, item := range request.Schemas { schemaIDs = append(schemaIDs, item.SchemaId) contentItems = append(contentItems, &schema.ContentItem{ Hash: schema.Hash(item.SchemaId, item.Schema), Content: item.Schema, Summary: item.Summary, }) } err := schema.Instance().PutManyContent(ctx, &schema.PutManyContentRequest{ ServiceID: serviceID, SchemaIDs: schemaIDs, Contents: contentItems, }) if err != nil { log.Error(fmt.Sprintf("put modify service[%s] schemas[len: %d] failed, operator: %s", serviceID, apply, remoteIP), err) return err } log.Info(fmt.Sprintf("put service[%s] schemas[len: %d], operator: %s", serviceID, apply, remoteIP)) return nil } // PutSchema modifies a specific schema. func PutSchema(ctx context.Context, request *pb.ModifySchemaRequest) error { remoteIP := util.GetIPFromContext(ctx) serviceID := request.ServiceId schemaID := request.SchemaId chars := len(request.Schema) if checkErr := validator.ValidatePutSchema(request); checkErr != nil { log.Error(fmt.Sprintf("invalid put service[%s] schemas[%s] request, operator: %s", serviceID, schemaID, remoteIP), checkErr) return pb.NewError(pb.ErrInvalidParams, checkErr.Error()) } if quotaErr := checkSchemaQuota(ctx, serviceID, schemaID); quotaErr != nil { log.Error(fmt.Sprintf("check service[%s] schema quota failed, operator: %s", serviceID, remoteIP), quotaErr) return quotaErr } if len(request.Summary) == 0 { log.Warn(fmt.Sprintf("service[%s] schema[%s]'s summary is empty, operator: %s", serviceID, schemaID, remoteIP)) } err := schema.Instance().PutContent(ctx, &schema.PutContentRequest{ ServiceID: request.ServiceId, SchemaID: request.SchemaId, Content: &schema.ContentItem{ Hash: schema.Hash(request.SchemaId, request.Schema), Content: request.Schema, Summary: request.Summary, }, }) if err != nil { log.Error(fmt.Sprintf("put service[%s] schema[%s chars: %d] failed, operator: %s", serviceID, schemaID, chars, remoteIP), err) return err } log.Info(fmt.Sprintf("put service[%s] schema[%s, chars: %d], operator: %s", serviceID, schemaID, chars, remoteIP)) return nil } func checkSchemaQuota(ctx context.Context, serviceID string, schemaID string) error { service, err := datasource.GetMetadataManager().GetService(ctx, &pb.GetServiceRequest{ ServiceId: serviceID, }) if err != nil { return err } if util.SliceHave(service.Schemas, schemaID) { return nil } if errQuota := quotasvc.ApplySchema(ctx, serviceID, 1); errQuota != nil { return errQuota } return nil } func Usage(ctx context.Context, serviceID string) (int64, error) { schemas, err := ListSchema(ctx, &pb.GetAllSchemaRequest{ ServiceId: serviceID, WithSchema: false, }) if err != nil { return 0, err } return int64(len(schemas)), nil }