in banyand/liaison/grpc/property.go [76:178]
func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyRequest) (resp *propertyv1.ApplyResponse, err error) {
property := req.Property
if property.Metadata == nil {
return nil, schema.BadRequest("metadata", "metadata should not be nil")
}
if property.Metadata.Group == "" {
return nil, schema.BadRequest("metadata.group", "group should not be nil")
}
if property.Metadata.Name == "" {
return nil, schema.BadRequest("metadata.name", "name should not be empty")
}
if property.Id == "" {
return nil, schema.BadRequest("id", "id should not be empty")
}
if len(property.Tags) == 0 {
return nil, schema.BadRequest("tags", "tags should not be empty")
}
g := req.Property.Metadata.Group
ps.metrics.totalStarted.Inc(1, g, "property", "apply")
start := time.Now()
defer func() {
ps.metrics.totalFinished.Inc(1, g, "property", "apply")
ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "property", "apply")
if err != nil {
ps.metrics.totalErr.Inc(1, g, "property", "apply")
}
}()
var group *commonv1.Group
if group, err = ps.schemaRegistry.GroupRegistry().GetGroup(ctx, g); err != nil {
return nil, errors.Errorf("group %s not found", g)
}
if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
return nil, errors.Errorf("group %s is not allowed to have properties", g)
}
if group.ResourceOpts == nil {
return nil, errors.Errorf("group %s has no resource options", g)
}
if group.ResourceOpts.ShardNum == 0 {
return nil, errors.Errorf("group %s has no shard number", g)
}
var propSchema *databasev1.Property
propSchema, err = ps.schemaRegistry.PropertyRegistry().GetProperty(ctx, &commonv1.Metadata{
Group: g,
Name: property.Metadata.Name,
})
if err != nil {
return nil, err
}
if len(propSchema.Tags) < len(property.Tags) {
return nil, errors.Errorf("property %s tags count mismatch", property.Metadata.Name)
}
for _, tag := range property.Tags {
found := false
for _, ts := range propSchema.Tags {
if ts.Name == tag.Key {
typ := databasev1.TagType(pbv1.MustTagValueToValueType(tag.Value))
if typ != databasev1.TagType_TAG_TYPE_UNSPECIFIED && ts.Type != typ {
return nil, errors.Errorf("property %s tag %s type mismatch", property.Metadata.Name, tag.Key)
}
found = true
}
}
if !found {
return nil, errors.Errorf("property %s tag %s not found", property.Metadata.Name, tag.Key)
}
}
qResp, err := ps.Query(ctx, &propertyv1.QueryRequest{
Groups: []string{g},
Name: property.Metadata.Name,
Ids: []string{property.Id},
})
if err != nil {
return nil, err
}
var prev *propertyv1.Property
if len(qResp.Properties) > 0 {
prev = qResp.Properties[0]
defer func() {
if err == nil {
var ids [][]byte
for _, p := range qResp.Properties {
ids = append(ids, getPropertyID(p))
}
if err = ps.remove(ids); err != nil {
err = multierr.Append(err, errors.New("fail to remove old properties"))
}
}
}()
}
entity := getEntity(property)
id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum)
if err != nil {
return nil, err
}
node, err := ps.nodeRegistry.Locate(g, entity, uint32(id))
if err != nil {
return nil, err
}
if req.Strategy == propertyv1.ApplyRequest_STRATEGY_REPLACE {
return ps.replaceProperty(ctx, start, uint64(id), node, prev, property)
}
return ps.mergeProperty(ctx, start, uint64(id), node, prev, property)
}