in datasource/mongo/ms.go [653:728]
func (ds *MetadataManager) modifySchemas(ctx context.Context, service *discovery.MicroService, schemas []*discovery.Schema) error {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
remoteIP := util.GetIPFromContext(ctx)
serviceID := service.ServiceId
filter := mutil.NewFilter(mutil.ServiceID(serviceID))
schemasFromDatabase, err := dao.GetSchemas(ctx, filter)
if err != nil {
log.Error(fmt.Sprintf("modify service %s schemas failed, get schemas failed, operator: %s", serviceID, remoteIP), err)
return discovery.NewError(discovery.ErrUnavailableBackend, err.Error())
}
needUpdateSchemas, needAddSchemas, needDeleteSchemas, _ :=
datasource.SchemasAnalysis(schemas, schemasFromDatabase, service.Schemas)
var schemasOps []mongo.WriteModel
var serviceOps []mongo.WriteModel
quotaSize := len(needAddSchemas) - len(needDeleteSchemas)
if quotaSize > 0 {
errQuota := quotasvc.ApplySchema(ctx, serviceID, int64(quotaSize))
if errQuota != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), errQuota)
return errQuota
}
}
var schemaIDs []string
for _, schema := range needAddSchemas {
log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
schemasOps = append(schemasOps, mongo.NewInsertOneModel().SetDocument(&model.Schema{
Domain: domain,
Project: project,
ServiceID: serviceID,
SchemaID: schema.SchemaId,
Schema: schema.Schema,
SchemaSummary: schema.Summary,
}))
schemaIDs = append(schemaIDs, schema.SchemaId)
}
for _, schema := range needUpdateSchemas {
log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(schema.SchemaId))
setFilter := mutil.NewFilter(
mutil.Schema(schema.Schema),
mutil.SchemaSummary(schema.Summary),
)
updateFilter := mutil.NewFilter(
mutil.Set(setFilter),
)
schemasOps = append(schemasOps, mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(updateFilter))
schemaIDs = append(schemaIDs, schema.SchemaId)
}
for _, schema := range needDeleteSchemas {
log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(schema.SchemaId))
schemasOps = append(schemasOps, mongo.NewDeleteOneModel().SetFilter(filter))
}
filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(serviceID))
setFilter := mutil.NewFilter(mutil.ServiceSchemas(schemaIDs))
updateFilter := mutil.NewFilter(mutil.Set(setFilter))
serviceOps = append(serviceOps, mongo.NewUpdateOneModel().SetUpdate(updateFilter).SetFilter(filter))
if len(schemasOps) > 0 {
_, err = dmongo.GetClient().GetDB().Collection(model.CollectionSchema).BulkWrite(ctx, schemasOps)
if err != nil {
return discovery.NewError(discovery.ErrInternal, err.Error())
}
}
if len(serviceOps) > 0 {
_, err = dmongo.GetClient().GetDB().Collection(model.CollectionService).BulkWrite(ctx, serviceOps)
if err != nil {
return discovery.NewError(discovery.ErrInternal, err.Error())
}
}
return nil
}