in datasource/etcd/schema.go [211:283]
func (dao *SchemaDAO) PutContent(ctx context.Context, contentRequest *schema.PutContentRequest) error {
domainProject := util.ParseDomainProject(ctx)
schemaID := contentRequest.SchemaID
serviceID := contentRequest.ServiceID
content := contentRequest.Content
service, err := datasource.GetMetadataManager().GetService(ctx, &discovery.GetServiceRequest{
ServiceId: serviceID,
})
if err != nil {
log.Error(fmt.Sprintf("get service[%s] failed", serviceID), err)
return err
}
refKey := path.GenerateServiceSchemaRefKey(domainProject, serviceID, schemaID)
contentKey := path.GenerateServiceSchemaContentKey(domainProject, content.Hash)
summaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schemaID)
existContentOptions := []etcdadpt.OpOptions{
etcdadpt.OpPut(etcdadpt.WithStrKey(refKey), etcdadpt.WithStrValue(content.Hash)),
etcdadpt.OpPut(etcdadpt.WithStrKey(summaryKey), etcdadpt.WithStrValue(content.Summary)),
}
refOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Hash, sync.WithOpts(map[string]string{"key": refKey}))
if err != nil {
log.Error("fail to create update opts", err)
return err
}
summaryOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Summary, sync.WithOpts(map[string]string{"key": summaryKey}))
if err != nil {
log.Error("fail to create update opts", err)
return err
}
existContentOptions = append(existContentOptions, refOpts...)
existContentOptions = append(existContentOptions, summaryOpts...)
// append the schemaID into service.Schemas if schemaID is new
if !util.SliceHave(service.Schemas, schemaID) {
service.Schemas = append(service.Schemas, schemaID)
body, err := json.Marshal(service)
if err != nil {
log.Error("marshal service failed", err)
return err
}
serviceKey := path.GenerateServiceKey(domainProject, serviceID)
existContentOptions = append(existContentOptions,
etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey), etcdadpt.WithValue(body)))
syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, body, sync.WithOpts(map[string]string{"key": serviceKey}))
if err != nil {
log.Error("fail to create update opts", err)
return err
}
existContentOptions = append(existContentOptions, syncOpts...)
}
newContentOptions := append(existContentOptions,
etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), etcdadpt.WithStrValue(content.Content)))
contentOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Content, sync.WithOpts(map[string]string{"key": contentKey}))
if err != nil {
log.Error("fail to create update opts", err)
return err
}
newContentOptions = append(newContentOptions, contentOpts...)
cmp, err := etcdadpt.TxnWithCmp(ctx, newContentOptions, etcdadpt.If(etcdadpt.NotExistKey(contentKey)), existContentOptions)
if err != nil {
log.Error(fmt.Sprintf("put kv[%s] failed", refKey), err)
return err
}
if cmp.Succeeded {
log.Info(fmt.Sprintf("put kv[%s] and content[chars: %d]", refKey, len(content.Content)))
} else {
log.Info(fmt.Sprintf("put kv[%s] without content", refKey))
}
return nil
}