func()

in datasource/mongo/ms.go [653:728]


func (ds *MetadataManager) modifySchemas(ctx context.Context, service *discovery.MicroService, schemas []*discovery.Schema) error {
	domain := util.ParseDomain(ctx)
	project := util.ParseProject(ctx)
	remoteIP := util.GetIPFromContext(ctx)
	serviceID := service.ServiceId
	filter := mutil.NewFilter(mutil.ServiceID(serviceID))
	schemasFromDatabase, err := dao.GetSchemas(ctx, filter)
	if err != nil {
		log.Error(fmt.Sprintf("modify service %s schemas failed, get schemas failed, operator: %s", serviceID, remoteIP), err)
		return discovery.NewError(discovery.ErrUnavailableBackend, err.Error())
	}

	needUpdateSchemas, needAddSchemas, needDeleteSchemas, _ :=
		datasource.SchemasAnalysis(schemas, schemasFromDatabase, service.Schemas)

	var schemasOps []mongo.WriteModel
	var serviceOps []mongo.WriteModel
	quotaSize := len(needAddSchemas) - len(needDeleteSchemas)
	if quotaSize > 0 {
		errQuota := quotasvc.ApplySchema(ctx, serviceID, int64(quotaSize))
		if errQuota != nil {
			log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), errQuota)
			return errQuota
		}
	}
	var schemaIDs []string
	for _, schema := range needAddSchemas {
		log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		schemasOps = append(schemasOps, mongo.NewInsertOneModel().SetDocument(&model.Schema{
			Domain:        domain,
			Project:       project,
			ServiceID:     serviceID,
			SchemaID:      schema.SchemaId,
			Schema:        schema.Schema,
			SchemaSummary: schema.Summary,
		}))
		schemaIDs = append(schemaIDs, schema.SchemaId)
	}

	for _, schema := range needUpdateSchemas {
		log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(schema.SchemaId))
		setFilter := mutil.NewFilter(
			mutil.Schema(schema.Schema),
			mutil.SchemaSummary(schema.Summary),
		)
		updateFilter := mutil.NewFilter(
			mutil.Set(setFilter),
		)
		schemasOps = append(schemasOps, mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(updateFilter))
		schemaIDs = append(schemaIDs, schema.SchemaId)
	}

	for _, schema := range needDeleteSchemas {
		log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(schema.SchemaId))
		schemasOps = append(schemasOps, mongo.NewDeleteOneModel().SetFilter(filter))
	}
	filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(serviceID))
	setFilter := mutil.NewFilter(mutil.ServiceSchemas(schemaIDs))
	updateFilter := mutil.NewFilter(mutil.Set(setFilter))
	serviceOps = append(serviceOps, mongo.NewUpdateOneModel().SetUpdate(updateFilter).SetFilter(filter))
	if len(schemasOps) > 0 {
		_, err = dmongo.GetClient().GetDB().Collection(model.CollectionSchema).BulkWrite(ctx, schemasOps)
		if err != nil {
			return discovery.NewError(discovery.ErrInternal, err.Error())
		}
	}
	if len(serviceOps) > 0 {
		_, err = dmongo.GetClient().GetDB().Collection(model.CollectionService).BulkWrite(ctx, serviceOps)
		if err != nil {
			return discovery.NewError(discovery.ErrInternal, err.Error())
		}
	}
	return nil
}