func()

in datasource/etcd/dep.go [114:172]


func (dm *DepManager) PutDependencies(ctx context.Context, dependencyInfos []*pb.ConsumerDependency, override bool) error {
	opts := make([]etcdadpt.OpOptions, 0, len(dependencyInfos))
	domainProject := util.ParseDomainProject(ctx)
	for _, dependencyInfo := range dependencyInfos {
		consumerFlag := util.StringJoin([]string{dependencyInfo.Consumer.Environment, dependencyInfo.Consumer.AppId, dependencyInfo.Consumer.ServiceName, dependencyInfo.Consumer.Version}, "/")
		consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{dependencyInfo.Consumer}, domainProject)[0]
		providersInfo := pb.DependenciesToKeys(dependencyInfo.Providers, domainProject)

		if err := datasource.ParamsChecker(consumerInfo, providersInfo); err != nil {
			log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, consumer is %s",
				override, consumerFlag), err)
			return err
		}

		consumerID, err := eutil.GetServiceID(ctx, consumerInfo)
		if err != nil {
			log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, get consumer[%s] id failed",
				override, consumerFlag), err)
			return pb.NewError(pb.ErrInternal, err.Error())
		}
		if len(consumerID) == 0 {
			log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, consumer[%s] does not exist",
				override, consumerFlag), nil)
			return pb.NewError(pb.ErrServiceNotExists, fmt.Sprintf("Consumer %s does not exist.", consumerFlag))
		}

		dependencyInfo.Override = override
		data, err := json.Marshal(dependencyInfo)
		if err != nil {
			log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, marshal consumer[%s] dependency failed",
				override, consumerFlag), err)
			return pb.NewError(pb.ErrInternal, err.Error())
		}

		id := path.DepsQueueUUID
		if !override {
			id = util.GenerateUUID()
		}
		key := path.GenerateConsumerDependencyQueueKey(domainProject, consumerID, id)
		opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
		syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, data, esync.WithOpts(map[string]string{"key": key}))
		if err != nil {
			log.Error("fail to create sync opts", err)
			return pb.NewError(pb.ErrInternal, err.Error())
		}
		opts = append(opts, syncOpts...)
	}

	err := etcdadpt.Txn(ctx, opts)
	if err != nil {
		log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, %v",
			override, dependencyInfos), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}

	log.Info(fmt.Sprintf("put request into dependency queue successfully, override: %t, %v, from remote %s",
		override, dependencyInfos, util.GetIPFromContext(ctx)))
	return nil
}