func()

in datasource/etcd/schema.go [211:283]


func (dao *SchemaDAO) PutContent(ctx context.Context, contentRequest *schema.PutContentRequest) error {
	domainProject := util.ParseDomainProject(ctx)
	schemaID := contentRequest.SchemaID
	serviceID := contentRequest.ServiceID
	content := contentRequest.Content

	service, err := datasource.GetMetadataManager().GetService(ctx, &discovery.GetServiceRequest{
		ServiceId: serviceID,
	})
	if err != nil {
		log.Error(fmt.Sprintf("get service[%s] failed", serviceID), err)
		return err
	}

	refKey := path.GenerateServiceSchemaRefKey(domainProject, serviceID, schemaID)
	contentKey := path.GenerateServiceSchemaContentKey(domainProject, content.Hash)
	summaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schemaID)
	existContentOptions := []etcdadpt.OpOptions{
		etcdadpt.OpPut(etcdadpt.WithStrKey(refKey), etcdadpt.WithStrValue(content.Hash)),
		etcdadpt.OpPut(etcdadpt.WithStrKey(summaryKey), etcdadpt.WithStrValue(content.Summary)),
	}
	refOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Hash, sync.WithOpts(map[string]string{"key": refKey}))
	if err != nil {
		log.Error("fail to create update opts", err)
		return err
	}
	summaryOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Summary, sync.WithOpts(map[string]string{"key": summaryKey}))
	if err != nil {
		log.Error("fail to create update opts", err)
		return err
	}
	existContentOptions = append(existContentOptions, refOpts...)
	existContentOptions = append(existContentOptions, summaryOpts...)

	// append the schemaID into service.Schemas if schemaID is new
	if !util.SliceHave(service.Schemas, schemaID) {
		service.Schemas = append(service.Schemas, schemaID)
		body, err := json.Marshal(service)
		if err != nil {
			log.Error("marshal service failed", err)
			return err
		}
		serviceKey := path.GenerateServiceKey(domainProject, serviceID)
		existContentOptions = append(existContentOptions,
			etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey), etcdadpt.WithValue(body)))
		syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, body, sync.WithOpts(map[string]string{"key": serviceKey}))
		if err != nil {
			log.Error("fail to create update opts", err)
			return err
		}
		existContentOptions = append(existContentOptions, syncOpts...)
	}
	newContentOptions := append(existContentOptions,
		etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), etcdadpt.WithStrValue(content.Content)))
	contentOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Content, sync.WithOpts(map[string]string{"key": contentKey}))
	if err != nil {
		log.Error("fail to create update opts", err)
		return err
	}
	newContentOptions = append(newContentOptions, contentOpts...)

	cmp, err := etcdadpt.TxnWithCmp(ctx, newContentOptions, etcdadpt.If(etcdadpt.NotExistKey(contentKey)), existContentOptions)
	if err != nil {
		log.Error(fmt.Sprintf("put kv[%s] failed", refKey), err)
		return err
	}
	if cmp.Succeeded {
		log.Info(fmt.Sprintf("put kv[%s] and content[chars: %d]", refKey, len(content.Content)))
	} else {
		log.Info(fmt.Sprintf("put kv[%s] without content", refKey))
	}
	return nil
}