in banyand/metadata/client.go [161:209]
func (s *clientService) Subjects(ctx context.Context, indexRule *databasev1.IndexRule, catalog commonv1.Catalog) ([]schema.Spec, error) {
bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: indexRule.GetMetadata().GetGroup()})
if err != nil {
return nil, err
}
now := time.Now()
var subjectErr error
foundSubjects := make([]schema.Spec, 0)
for _, binding := range bindings {
if binding.GetBeginAt().AsTime().After(now) ||
binding.GetExpireAt().AsTime().Before(now) {
continue
}
sub := binding.GetSubject()
if sub.GetCatalog() != catalog {
continue
}
if !contains(binding.GetRules(), indexRule.GetMetadata().GetName()) {
continue
}
switch catalog {
case commonv1.Catalog_CATALOG_STREAM:
stream, getErr := s.schemaRegistry.GetStream(ctx, &commonv1.Metadata{
Name: sub.GetName(),
Group: indexRule.GetMetadata().GetGroup(),
})
if getErr != nil {
subjectErr = multierr.Append(subjectErr, getErr)
}
foundSubjects = append(foundSubjects, stream)
case commonv1.Catalog_CATALOG_MEASURE:
measure, getErr := s.schemaRegistry.GetMeasure(ctx, &commonv1.Metadata{
Name: sub.GetName(),
Group: indexRule.GetMetadata().GetGroup(),
})
if getErr != nil {
subjectErr = multierr.Append(subjectErr, getErr)
}
foundSubjects = append(foundSubjects, measure)
default:
continue
}
}
return foundSubjects, subjectErr
}