func()

in banyand/liaison/grpc/property.go [76:178]


func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyRequest) (resp *propertyv1.ApplyResponse, err error) {
	property := req.Property
	if property.Metadata == nil {
		return nil, schema.BadRequest("metadata", "metadata should not be nil")
	}
	if property.Metadata.Group == "" {
		return nil, schema.BadRequest("metadata.group", "group should not be nil")
	}
	if property.Metadata.Name == "" {
		return nil, schema.BadRequest("metadata.name", "name should not be empty")
	}
	if property.Id == "" {
		return nil, schema.BadRequest("id", "id should not be empty")
	}
	if len(property.Tags) == 0 {
		return nil, schema.BadRequest("tags", "tags should not be empty")
	}
	g := req.Property.Metadata.Group
	ps.metrics.totalStarted.Inc(1, g, "property", "apply")
	start := time.Now()
	defer func() {
		ps.metrics.totalFinished.Inc(1, g, "property", "apply")
		ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "property", "apply")
		if err != nil {
			ps.metrics.totalErr.Inc(1, g, "property", "apply")
		}
	}()
	var group *commonv1.Group
	if group, err = ps.schemaRegistry.GroupRegistry().GetGroup(ctx, g); err != nil {
		return nil, errors.Errorf("group %s not found", g)
	}
	if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
		return nil, errors.Errorf("group %s is not allowed to have properties", g)
	}
	if group.ResourceOpts == nil {
		return nil, errors.Errorf("group %s has no resource options", g)
	}
	if group.ResourceOpts.ShardNum == 0 {
		return nil, errors.Errorf("group %s has no shard number", g)
	}
	var propSchema *databasev1.Property
	propSchema, err = ps.schemaRegistry.PropertyRegistry().GetProperty(ctx, &commonv1.Metadata{
		Group: g,
		Name:  property.Metadata.Name,
	})
	if err != nil {
		return nil, err
	}
	if len(propSchema.Tags) < len(property.Tags) {
		return nil, errors.Errorf("property %s tags count mismatch", property.Metadata.Name)
	}
	for _, tag := range property.Tags {
		found := false
		for _, ts := range propSchema.Tags {
			if ts.Name == tag.Key {
				typ := databasev1.TagType(pbv1.MustTagValueToValueType(tag.Value))
				if typ != databasev1.TagType_TAG_TYPE_UNSPECIFIED && ts.Type != typ {
					return nil, errors.Errorf("property %s tag %s type mismatch", property.Metadata.Name, tag.Key)
				}
				found = true
			}
		}
		if !found {
			return nil, errors.Errorf("property %s tag %s not found", property.Metadata.Name, tag.Key)
		}
	}
	qResp, err := ps.Query(ctx, &propertyv1.QueryRequest{
		Groups: []string{g},
		Name:   property.Metadata.Name,
		Ids:    []string{property.Id},
	})
	if err != nil {
		return nil, err
	}
	var prev *propertyv1.Property
	if len(qResp.Properties) > 0 {
		prev = qResp.Properties[0]
		defer func() {
			if err == nil {
				var ids [][]byte
				for _, p := range qResp.Properties {
					ids = append(ids, getPropertyID(p))
				}
				if err = ps.remove(ids); err != nil {
					err = multierr.Append(err, errors.New("fail to remove old properties"))
				}
			}
		}()
	}
	entity := getEntity(property)
	id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum)
	if err != nil {
		return nil, err
	}
	node, err := ps.nodeRegistry.Locate(g, entity, uint32(id))
	if err != nil {
		return nil, err
	}
	if req.Strategy == propertyv1.ApplyRequest_STRATEGY_REPLACE {
		return ps.replaceProperty(ctx, start, uint64(id), node, prev, property)
	}
	return ps.mergeProperty(ctx, start, uint64(id), node, prev, property)
}