func createOrUpdateTopNMeasure()

in banyand/measure/metadata.go [137:205]


func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) {
	oldTopNSchema, err := measureSchemaRegistry.GetMeasure(context.TODO(), topNSchema.GetMetadata())
	if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
		return nil, err
	}

	sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(context.Background(), topNSchema.GetSourceMeasure())
	if err != nil {
		return nil, err
	}

	tagNames := sourceMeasureSchema.GetEntity().GetTagNames()
	seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))

	for _, tagName := range tagNames {
		var found bool
		for _, fSpec := range sourceMeasureSchema.GetTagFamilies() {
			for _, tSpec := range fSpec.GetTags() {
				if tSpec.GetName() == tagName {
					seriesSpecs = append(seriesSpecs, tSpec)
					found = true
					goto CHECK
				}
			}
		}

	CHECK:
		if !found {
			return nil, fmt.Errorf("fail to find tag spec %s", tagName)
		}
	}

	// create a new "derived" measure for TopN result
	newTopNMeasure := &databasev1.Measure{
		Metadata: topNSchema.GetMetadata(),
		Interval: sourceMeasureSchema.GetInterval(),
		TagFamilies: []*databasev1.TagFamilySpec{
			{
				Name: TopNTagFamily,
				Tags: append([]*databasev1.TagSpec{
					{
						Name: "measure_id",
						Type: databasev1.TagType_TAG_TYPE_STRING,
					},
				}, seriesSpecs...),
			},
		},
		Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
	}
	if oldTopNSchema == nil {
		if innerErr := measureSchemaRegistry.CreateMeasure(context.Background(), newTopNMeasure); innerErr != nil {
			return nil, innerErr
		}
		return newTopNMeasure, nil
	}
	// compare with the old one
	if cmp.Diff(newTopNMeasure, oldTopNSchema,
		protocmp.IgnoreUnknown(),
		protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
		protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
		protocmp.Transform()) == "" {
		return oldTopNSchema, nil
	}
	// update
	if err = measureSchemaRegistry.UpdateMeasure(context.Background(), newTopNMeasure); err != nil {
		return nil, err
	}
	return newTopNMeasure, nil
}