func()

in banyand/liaison/grpc/property.go [286:392]


func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryRequest) (resp *propertyv1.QueryResponse, err error) {
	ps.metrics.totalStarted.Inc(1, "", "property", "query")
	start := time.Now()
	defer func() {
		ps.metrics.totalFinished.Inc(1, "", "property", "query")
		ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), "", "property", "query")
		if err != nil {
			ps.metrics.totalErr.Inc(1, "", "property", "query")
		}
	}()
	if len(req.Groups) == 0 {
		return nil, schema.BadRequest("groups", "groups should not be empty")
	}
	if req.Limit == 0 {
		req.Limit = 100
	}
	var span *query.Span
	if req.Trace {
		tracer, _ := query.NewTracer(ctx, start.Format(time.RFC3339Nano))
		span, _ = tracer.StartSpan(ctx, "property-grpc")
		span.Tag("request", convert.BytesToString(logger.Proto(req)))
		defer func() {
			if err != nil {
				span.Error(err)
			} else {
				resp.Trace = tracer.ToProto()
			}
			span.Stop()
		}()
	}
	for _, gn := range req.Groups {
		if g, getGroupErr := ps.schemaRegistry.GroupRegistry().GetGroup(ctx, gn); getGroupErr != nil {
			return nil, errors.Errorf("group %s not found", gn)
		} else if g.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
			return nil, errors.Errorf("group %s is not allowed to have properties", gn)
		}
	}
	ff, err := ps.pipeline.Broadcast(defaultQueryTimeout, data.TopicPropertyQuery, bus.NewMessage(bus.MessageID(start.Unix()), req))
	if err != nil {
		return nil, err
	}
	res := make(map[string]*propertyv1.Property)
	for _, f := range ff {
		if m, getErr := f.Get(); getErr != nil {
			err = multierr.Append(err, getErr)
		} else {
			d := m.Data()
			if d == nil {
				continue
			}
			switch v := d.(type) {
			case *propertyv1.InternalQueryResponse:
				for _, s := range v.Sources {
					var p propertyv1.Property
					err = protojson.Unmarshal(s, &p)
					if err != nil {
						return nil, err
					}
					entity := getEntity(&p)
					cur, ok := res[entity]
					if !ok {
						res[entity] = &p
						continue
					}
					if cur.Metadata.ModRevision < p.Metadata.ModRevision {
						res[entity] = &p
						err = ps.remove([][]byte{getPropertyID(cur)})
						if err != nil {
							return nil, err
						}
					}
				}
				if span != nil {
					span.AddSubTrace(v.Trace)
				}
			case *common.Error:
				err = multierr.Append(err, errors.New(v.Error()))
			}
		}
	}
	if err != nil {
		return nil, err
	}
	if len(res) == 0 {
		return &propertyv1.QueryResponse{Properties: nil}, nil
	}
	properties := make([]*propertyv1.Property, 0, len(res))
	for _, p := range res {
		if len(req.TagProjection) > 0 {
			var tags []*modelv1.Tag
			for _, tag := range p.Tags {
				for _, tp := range req.TagProjection {
					if tp == tag.Key {
						tags = append(tags, tag)
						break
					}
				}
			}
			p.Tags = tags
		}
		properties = append(properties, p)
		if len(properties) >= int(req.Limit) {
			break
		}
	}
	return &propertyv1.QueryResponse{Properties: properties}, nil
}