in banyand/measure/metadata.go [64:135]
func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
g := metadata.Spec.(*commonv1.Group)
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindGroup,
Metadata: g.GetMetadata(),
})
case schema.KindMeasure:
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: metadata.Spec.(*databasev1.Measure).GetMetadata(),
})
case schema.KindIndexRuleBinding:
irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding)
if !ok {
sr.l.Warn().Msg("fail to convert message to IndexRuleBinding")
return
}
if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
stm, err := sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{
Name: irb.GetSubject().GetName(),
Group: metadata.Group,
})
cancel()
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subject")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: stm.GetMetadata(),
})
}
case schema.KindIndexRule:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
subjects, err := sr.metadata.Subjects(ctx, metadata.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE)
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subjects(measure)")
return
}
for _, sub := range subjects {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: sub.(*databasev1.Measure).GetMetadata(),
})
}
case schema.KindTopNAggregation:
// createOrUpdate TopN schemas in advance
_, err := createOrUpdateTopNMeasure(sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation))
if err != nil {
sr.l.Error().Err(err).Msg("fail to create/update topN measure")
return
}
// reload source measure
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: metadata.Spec.(*databasev1.TopNAggregation).GetSourceMeasure(),
})
default:
}
}