func()

in datasource/etcd/ms.go [1312:1378]


func (ds *MetadataManager) modifySchemas(ctx context.Context, domainProject string, service *pb.MicroService,
	schemas []*pb.Schema) error {
	remoteIP := util.GetIPFromContext(ctx)
	serviceID := service.ServiceId
	schemasFromDatabase, err := getSchemasFromDatabase(ctx, domainProject, serviceID)
	if err != nil {
		log.Error(fmt.Sprintf("modify service[%s] schemas failed, get schemas failed, operator: %s",
			serviceID, remoteIP), nil)
		return pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}

	needUpdateSchemas, needAddSchemas, needDeleteSchemas, _ :=
		datasource.SchemasAnalysis(schemas, schemasFromDatabase, service.Schemas)

	pluginOps := make([]etcdadpt.OpOptions, 0)
	quotaSize := len(needAddSchemas) - len(needDeleteSchemas)
	if quotaSize > 0 {
		errQuota := quotasvc.ApplySchema(ctx, serviceID, int64(quotaSize))
		if errQuota != nil {
			log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), errQuota)
			return errQuota
		}
	}

	var schemaIDs []string
	for _, schema := range needAddSchemas {
		log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		opts, _ := putSchema(ctx, domainProject, service.ServiceId, schema)
		pluginOps = append(pluginOps, opts...)
		schemaIDs = append(schemaIDs, schema.SchemaId)
	}

	for _, schema := range needUpdateSchemas {
		log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		opts, _ := putSchema(ctx, domainProject, serviceID, schema)
		pluginOps = append(pluginOps, opts...)
		schemaIDs = append(schemaIDs, schema.SchemaId)
	}

	for _, schema := range needDeleteSchemas {
		log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		opts, _ := deleteSchema(ctx, domainProject, serviceID, schema)
		pluginOps = append(pluginOps, opts...)
	}

	service.Schemas = schemaIDs
	opts, err := eutil.UpdateService(ctx, domainProject, serviceID, service)
	if err != nil {
		log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
			serviceID, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	pluginOps = append(pluginOps, opts...)

	if len(pluginOps) != 0 {
		resp, err := etcdadpt.TxnWithCmp(ctx, pluginOps,
			etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, serviceID), 0)),
			nil)
		if err != nil {
			return pb.NewError(pb.ErrUnavailableBackend, err.Error())
		}
		if !resp.Succeeded {
			return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
		}
	}
	return nil
}