in datasource/etcd/dep.go [114:172]
func (dm *DepManager) PutDependencies(ctx context.Context, dependencyInfos []*pb.ConsumerDependency, override bool) error {
opts := make([]etcdadpt.OpOptions, 0, len(dependencyInfos))
domainProject := util.ParseDomainProject(ctx)
for _, dependencyInfo := range dependencyInfos {
consumerFlag := util.StringJoin([]string{dependencyInfo.Consumer.Environment, dependencyInfo.Consumer.AppId, dependencyInfo.Consumer.ServiceName, dependencyInfo.Consumer.Version}, "/")
consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{dependencyInfo.Consumer}, domainProject)[0]
providersInfo := pb.DependenciesToKeys(dependencyInfo.Providers, domainProject)
if err := datasource.ParamsChecker(consumerInfo, providersInfo); err != nil {
log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, consumer is %s",
override, consumerFlag), err)
return err
}
consumerID, err := eutil.GetServiceID(ctx, consumerInfo)
if err != nil {
log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, get consumer[%s] id failed",
override, consumerFlag), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
if len(consumerID) == 0 {
log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, consumer[%s] does not exist",
override, consumerFlag), nil)
return pb.NewError(pb.ErrServiceNotExists, fmt.Sprintf("Consumer %s does not exist.", consumerFlag))
}
dependencyInfo.Override = override
data, err := json.Marshal(dependencyInfo)
if err != nil {
log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, marshal consumer[%s] dependency failed",
override, consumerFlag), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
id := path.DepsQueueUUID
if !override {
id = util.GenerateUUID()
}
key := path.GenerateConsumerDependencyQueueKey(domainProject, consumerID, id)
opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, data, esync.WithOpts(map[string]string{"key": key}))
if err != nil {
log.Error("fail to create sync opts", err)
return pb.NewError(pb.ErrInternal, err.Error())
}
opts = append(opts, syncOpts...)
}
err := etcdadpt.Txn(ctx, opts)
if err != nil {
log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, %v",
override, dependencyInfos), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
log.Info(fmt.Sprintf("put request into dependency queue successfully, override: %t, %v, from remote %s",
override, dependencyInfos, util.GetIPFromContext(ctx)))
return nil
}