in banyand/measure/metadata.go [137:205]
func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) {
oldTopNSchema, err := measureSchemaRegistry.GetMeasure(context.TODO(), topNSchema.GetMetadata())
if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
return nil, err
}
sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(context.Background(), topNSchema.GetSourceMeasure())
if err != nil {
return nil, err
}
tagNames := sourceMeasureSchema.GetEntity().GetTagNames()
seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
for _, tagName := range tagNames {
var found bool
for _, fSpec := range sourceMeasureSchema.GetTagFamilies() {
for _, tSpec := range fSpec.GetTags() {
if tSpec.GetName() == tagName {
seriesSpecs = append(seriesSpecs, tSpec)
found = true
goto CHECK
}
}
}
CHECK:
if !found {
return nil, fmt.Errorf("fail to find tag spec %s", tagName)
}
}
// create a new "derived" measure for TopN result
newTopNMeasure := &databasev1.Measure{
Metadata: topNSchema.GetMetadata(),
Interval: sourceMeasureSchema.GetInterval(),
TagFamilies: []*databasev1.TagFamilySpec{
{
Name: TopNTagFamily,
Tags: append([]*databasev1.TagSpec{
{
Name: "measure_id",
Type: databasev1.TagType_TAG_TYPE_STRING,
},
}, seriesSpecs...),
},
},
Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
}
if oldTopNSchema == nil {
if innerErr := measureSchemaRegistry.CreateMeasure(context.Background(), newTopNMeasure); innerErr != nil {
return nil, innerErr
}
return newTopNMeasure, nil
}
// compare with the old one
if cmp.Diff(newTopNMeasure, oldTopNSchema,
protocmp.IgnoreUnknown(),
protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
protocmp.Transform()) == "" {
return oldTopNSchema, nil
}
// update
if err = measureSchemaRegistry.UpdateMeasure(context.Background(), newTopNMeasure); err != nil {
return nil, err
}
return newTopNMeasure, nil
}