in datasource/etcd/ms.go [1312:1378]
func (ds *MetadataManager) modifySchemas(ctx context.Context, domainProject string, service *pb.MicroService,
schemas []*pb.Schema) error {
remoteIP := util.GetIPFromContext(ctx)
serviceID := service.ServiceId
schemasFromDatabase, err := getSchemasFromDatabase(ctx, domainProject, serviceID)
if err != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas failed, get schemas failed, operator: %s",
serviceID, remoteIP), nil)
return pb.NewError(pb.ErrUnavailableBackend, err.Error())
}
needUpdateSchemas, needAddSchemas, needDeleteSchemas, _ :=
datasource.SchemasAnalysis(schemas, schemasFromDatabase, service.Schemas)
pluginOps := make([]etcdadpt.OpOptions, 0)
quotaSize := len(needAddSchemas) - len(needDeleteSchemas)
if quotaSize > 0 {
errQuota := quotasvc.ApplySchema(ctx, serviceID, int64(quotaSize))
if errQuota != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), errQuota)
return errQuota
}
}
var schemaIDs []string
for _, schema := range needAddSchemas {
log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
opts, _ := putSchema(ctx, domainProject, service.ServiceId, schema)
pluginOps = append(pluginOps, opts...)
schemaIDs = append(schemaIDs, schema.SchemaId)
}
for _, schema := range needUpdateSchemas {
log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
opts, _ := putSchema(ctx, domainProject, serviceID, schema)
pluginOps = append(pluginOps, opts...)
schemaIDs = append(schemaIDs, schema.SchemaId)
}
for _, schema := range needDeleteSchemas {
log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
opts, _ := deleteSchema(ctx, domainProject, serviceID, schema)
pluginOps = append(pluginOps, opts...)
}
service.Schemas = schemaIDs
opts, err := eutil.UpdateService(ctx, domainProject, serviceID, service)
if err != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
serviceID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
pluginOps = append(pluginOps, opts...)
if len(pluginOps) != 0 {
resp, err := etcdadpt.TxnWithCmp(ctx, pluginOps,
etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, serviceID), 0)),
nil)
if err != nil {
return pb.NewError(pb.ErrUnavailableBackend, err.Error())
}
if !resp.Succeeded {
return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
}
}
return nil
}