in banyand/liaison/grpc/property.go [286:392]
func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryRequest) (resp *propertyv1.QueryResponse, err error) {
ps.metrics.totalStarted.Inc(1, "", "property", "query")
start := time.Now()
defer func() {
ps.metrics.totalFinished.Inc(1, "", "property", "query")
ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), "", "property", "query")
if err != nil {
ps.metrics.totalErr.Inc(1, "", "property", "query")
}
}()
if len(req.Groups) == 0 {
return nil, schema.BadRequest("groups", "groups should not be empty")
}
if req.Limit == 0 {
req.Limit = 100
}
var span *query.Span
if req.Trace {
tracer, _ := query.NewTracer(ctx, start.Format(time.RFC3339Nano))
span, _ = tracer.StartSpan(ctx, "property-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
defer func() {
if err != nil {
span.Error(err)
} else {
resp.Trace = tracer.ToProto()
}
span.Stop()
}()
}
for _, gn := range req.Groups {
if g, getGroupErr := ps.schemaRegistry.GroupRegistry().GetGroup(ctx, gn); getGroupErr != nil {
return nil, errors.Errorf("group %s not found", gn)
} else if g.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
return nil, errors.Errorf("group %s is not allowed to have properties", gn)
}
}
ff, err := ps.pipeline.Broadcast(defaultQueryTimeout, data.TopicPropertyQuery, bus.NewMessage(bus.MessageID(start.Unix()), req))
if err != nil {
return nil, err
}
res := make(map[string]*propertyv1.Property)
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
err = multierr.Append(err, getErr)
} else {
d := m.Data()
if d == nil {
continue
}
switch v := d.(type) {
case *propertyv1.InternalQueryResponse:
for _, s := range v.Sources {
var p propertyv1.Property
err = protojson.Unmarshal(s, &p)
if err != nil {
return nil, err
}
entity := getEntity(&p)
cur, ok := res[entity]
if !ok {
res[entity] = &p
continue
}
if cur.Metadata.ModRevision < p.Metadata.ModRevision {
res[entity] = &p
err = ps.remove([][]byte{getPropertyID(cur)})
if err != nil {
return nil, err
}
}
}
if span != nil {
span.AddSubTrace(v.Trace)
}
case *common.Error:
err = multierr.Append(err, errors.New(v.Error()))
}
}
}
if err != nil {
return nil, err
}
if len(res) == 0 {
return &propertyv1.QueryResponse{Properties: nil}, nil
}
properties := make([]*propertyv1.Property, 0, len(res))
for _, p := range res {
if len(req.TagProjection) > 0 {
var tags []*modelv1.Tag
for _, tag := range p.Tags {
for _, tp := range req.TagProjection {
if tp == tag.Key {
tags = append(tags, tag)
break
}
}
}
p.Tags = tags
}
properties = append(properties, p)
if len(properties) >= int(req.Limit) {
break
}
}
return &propertyv1.QueryResponse{Properties: properties}, nil
}