in banyand/stream/metadata.go [56:114]
func (sr *schemaRepo) OnAddOrUpdate(m schema.Metadata) {
switch m.Kind {
case schema.KindGroup:
g := m.Spec.(*commonv1.Group)
if g.Catalog != commonv1.Catalog_CATALOG_STREAM {
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindGroup,
Metadata: g.GetMetadata(),
})
case schema.KindStream:
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: m.Spec.(*databasev1.Stream).GetMetadata(),
})
case schema.KindIndexRuleBinding:
irb, ok := m.Spec.(*databasev1.IndexRuleBinding)
if !ok {
sr.l.Warn().Msg("fail to convert message to IndexRuleBinding")
return
}
if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
stm, err := sr.metadata.StreamRegistry().GetStream(ctx, &commonv1.Metadata{
Name: irb.GetSubject().GetName(),
Group: m.Group,
})
cancel()
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subject")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: stm.GetMetadata(),
})
}
case schema.KindIndexRule:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
subjects, err := sr.metadata.Subjects(ctx, m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM)
cancel()
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subjects(stream)")
return
}
for _, sub := range subjects {
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
Metadata: sub.(*databasev1.Stream).GetMetadata(),
})
}
default:
}
}